aalexandrov commented on a change in pull request #13770:
URL: https://github.com/apache/flink/pull/13770#discussion_r518709552



##########
File path: docs/dev/table/connectors/kinesis.md
##########
@@ -0,0 +1,334 @@
+---
+title: "Amazon Kinesis Data Streams SQL Connector"
+nav-title: Kinesis
+nav-parent_id: sql-connectors
+nav-pos: 2
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-primary">Scan Source: Unbounded</span>
+<span class="label label-primary">Sink: Streaming Append Mode</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The Kinesis connector allows for reading data from and writing data into 
[Amazon Kinesis Data Streams 
(KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+To use the connector, add the following Maven dependency to your project:
+
+{% highlight xml %}
+<dependency>
+  <groupId>org.apache.flink</groupId>
+  <artifactId>flink-connector-kinesis{{ site.scala_version_suffix 
}}</artifactId>
+  <version>{{site.version }}</version>
+</dependency>
+{% endhighlight %}
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer 
Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
 to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data 
stream:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3)
+) PARTITIONED BY (user_id, item_id) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Available Metadata
+------------------
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a 
table definition.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Column Name</th>
+      <th class="text-center" style="width: 45%">Column Type</th>
+      <th class="text-center" style="width: 35%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp";>ApproximateArrivalTimestamp</a></code></td>
+      <td><code>TIMESTAMP(3) WITH LOCAL TIMEZONE NOT NULL</code></td>
+      <td>The approximate time that the record was inserted into the 
stream.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId";>ShardId</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the shard within the stream from which the 
record was read.</td>
+    </tr>
+    <tr>
+      <td><code><a 
href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber";>SequenceNumber</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the record within its shard.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these 
metadata columns:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE KinesisTable (
+ user_id BIGINT,
+ item_id BIGINT,
+ category_id BIGINT,
+ behavior STRING,
+ ts TIMESTAMP(3),
+ arrival_time TIMESTAMP(3) METADATA FROM 'ApproximateArrivalTimestamp' VIRTUAL,
+ shard_id VARCHAR(128) NOT NULL METADATA FROM 'ShardId' VIRTUAL,
+ sequence_number VARCHAR(128) NOT NULL METADATA FROM 'SequenceNumber' VIRTUAL
+) PARTITIONED BY (user_id, item_id) WITH (
+ 'connector' = 'kinesis',
+ 'stream' = 'user_behavior',
+ 'properties.aws.region' = 'us-east-2',
+ 'properties.flink.stream.initpos' = 'LATEST',
+ 'format' = 'csv'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use 
<code>'kinesis'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream 
records. See <a href="#data-type-mapping">Data Type Mapping</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis 
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner.field.delimiter</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">|</td>
+      <td>String</td>
+      <td>Optional field delimiter for a fields-based partitioner derived from 
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> 
for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.region</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined. Either this or 
<code>properties.aws.endpoint</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if 
not set). Either this or <code>properties.aws.region</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.initpos</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.flink.stream.recordpublisher</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The `RecordPublisher` type to use for sources. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>properties.*</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>
+      Other properties to pass to the FlinkKinesisConsumer or 
FlinkKinesisProducer constructors. 
+      See the constants defined in 
+      <ul>
+        <li><code>AWSConfigConstants</code>,</li> 
+        <li><code>ConsumerConfigConstants</code>, and</li> 
+        <li><code>ProducerConfigConstants</code></li>
+      </ul>
+      for detailed information on the available suffixes.
+      </td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+--------
+
+### Authorization
+
+Make sure to [create an appropriate IAM 
policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) 
to allow reading from / writing to the Kinesis data streams.
+
+### Authentication
+
+Depending on your deployment you would choose a different Credentials Provider 
to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, 
this results in using the `BASIC` provider.  
+
+A specific 
[AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html)
 can be **optionally** set using the `properties.aws.credentials.provider` 
setting.
+Supported values are:
+
+* `AUTO` - Use the default AWS Credentials Provider chain that searches for 
credentials in the following order: `ENV_VARS`, `SYS_PROPS`, 
`WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+* `BASIC` - Use access key ID and secret key supplied as configuration. 
+* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment 
variables.
+* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and 
`aws.secretKey`.
+* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials 
for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web 
Identity Token. 
+
+### Start Reading Position
+
+You can configure table sources to start reading a table-backing Kinesis data 
stream from a specific position through the `properties.flink.stream.initpos` 
option.

Review comment:
       OK, I suggest to use the following options:
   
   
   <table class="table table-bordered">
       <thead>
       <tr>
         <th class="text-left" style="width: 25%">Option</th>
         <th class="text-center" style="width: 8%">Required</th>
         <th class="text-center" style="width: 7%">Default</th>
         <th class="text-center" style="width: 10%">Type</th>
         <th class="text-center" style="width: 50%">Description</th>
       </tr>
       </thead>
       <tbody>
       <tr>
         <td><h5>connector</h5></td>
         <td>required</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>Specify what connector to use. For Kinesis use 
<code>'kinesis'</code>.</td>
       </tr>
       <tr>
         <td><h5>stream</h5></td>
         <td>required</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>Name of the Kinesis data stream backing this table.</td>
       </tr>
       <tr>
         <td><h5>format</h5></td>
         <td>required</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The format used to deserialize and serialize Kinesis data stream 
records. See <a href="#data-type-mapping">Data Type Mapping</a> for 
details.</td>
       </tr>
       <tr>
         <td><h5>sink.partitioner</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">random or row-based</td>
         <td>String</td>
         <td>Optional output partitioning from Flink's partitions into Kinesis 
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
       </tr>
       <tr>
         <td><h5>sink.partitioner.field.delimiter</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">|</td>
         <td>String</td>
         <td>Optional field delimiter for a fields-based partitioner derived 
from a PARTITION BY clause. See <a href="#sink-partitioning">Sink 
Partitioning</a> for details.</td>
       </tr>
       <tr>
         <td><h5>aws.region</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The AWS region where the stream is defined. Either this or 
<code>aws.endpoint</code> are required.</td>
       </tr>
       <tr>
         <td><h5>aws.endpoint</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The AWS endpoint for Kinesis (derived from the AWS region setting 
if not set). Either this or <code>aws.region</code> are required.</td>
       </tr>
       <tr>
         <td><h5>aws.credentials.provider</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">AUTO</td>
         <td>String</td>
         <td>A credentials provider to use when authenticating against the 
Kinesis endpoint. See <a href="#authentication">Authentication</a> for 
details.</td>
       </tr>
       <tr>
         <td><h5>source.stream.recordpublisher</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">POLLING</td>
         <td>String</td>
         <td>The <code>RecordPublisher</code> type to use for sources. See <a 
href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
       </tr>
       <tr>
         <td><h5>source.stream.efo.consumername</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The name of the EFO consumer to register with KDS.</td>
       </tr>
       <tr>
         <td><h5>source.stream.efo.registration</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">LAZY</td>
         <td>String</td>
         <td>Determine how and when consumer de-/registration is performed 
(LAZY|EAGER|NONE).</td>
       </tr>
       <tr>
         <td><h5>source.stream.efo.consumerarn</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The prefix of consumer ARN for a given stream.</td>
       </tr>
       <tr>
         <td><h5>source.stream.initpos</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">LATEST</td>
         <td>String</td>
         <td>Initial position to be used when reading from the table. See <a 
href="#start-reading-position">Start Reading Position</a> for details.</td>
       </tr>
       <tr>
         <td><h5>source.stream.initpos.timestamp</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td>String</td>
         <td>The initial timestamp to start reading Kinesis stream from (when 
AT_TIMESTAMP is set for STREAM_INITIAL_POSITION).</td>
       </tr>
       <tr>
         <td><h5>source.stream.initpos.timestamp.format</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
         <td>String</td>
         <td>The date format of initial timestamp to start reading Kinesis 
stream from (when AT_TIMESTAMP is set for STREAM_INITIAL_POSITION).</td>
       </tr>
       <tr>
         <td><h5>source.stream.describe.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">50</td>
         <td>Integer</td>
         <td>The maximum number of <code>describeStream</code> attempts if we 
get a recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.stream.describe.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">2000</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
       </tr>
       <tr>
         <td><h5>source.stream.describe.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">5000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds)  between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
       </tr>
       <tr>
         <td><h5>source.stream.describe.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
       </tr>
       <tr>
         <td><h5>source.list.shards.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10</td>
         <td>Integer</td>
         <td>The maximum number of <code>listShards</code> attempts if we get a 
recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.list.shards.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>listShards</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.list.shards.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">5000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between each 
<code>listShards</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.list.shards.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>listShards</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.describestreamconsumer.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">50</td>
         <td>Integer</td>
         <td>The maximum number of <code>describeStreamConsumer</code> attempts 
if we get a recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.stream.describestreamconsumer.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">2000</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>describeStreamConsumer</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.describestreamconsumer.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">5000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between each 
<code>describeStreamConsumer</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.describestreamconsumer.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>describeStreamConsumer</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.registerstreamconsumer.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10</td>
         <td>Integer</td>
         <td>The maximum number of <code>registerStream</code> attempts if we 
get a recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.stream.registerstreamconsumer.timeout</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">60</td>
         <td>Integer</td>
         <td>The maximum time in seconds to wait for a stream consumer to 
become active before giving up.</td>
       </tr>
       <tr>
         <td><h5>source.stream.registerstreamconsumer.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">500</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>registerStream</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.registerstreamconsumer.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">2000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between each 
<code>registerStream</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.registerstreamconsumer.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>registerStream</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.deregisterstreamconsumer.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10</td>
         <td>Integer</td>
         <td>The maximum number of <code>deregisterStream</code> attempts if we 
get a recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.stream.deregisterstreamconsumer.timeout</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">60</td>
         <td>Integer</td>
         <td>The maximum time in seconds to wait for a stream consumer to 
deregister before giving up.</td>
       </tr>
       <tr>
         <td><h5>source.stream.deregisterstreamconsumer.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">500</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>deregisterStream</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.stream.deregisterstreamconsumer.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">2000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between each 
<code>deregisterStream</code> attempt.</td>
       </tr>
       <tr>
         
<td><h5>source.stream.deregisterstreamconsumer.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>deregisterStream</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.subscribetoshard.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10</td>
         <td>Integer</td>
         <td>The maximum number of <code>subscribeToShard</code> attempts if we 
get a recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.shard.subscribetoshard.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between each 
<code>subscribeToShard</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.subscribetoshard.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">2000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between each 
<code>subscribeToShard</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.subscribetoshard.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>subscribeToShard</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.maxrecordcount</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10000</td>
         <td>Integer</td>
         <td>The maximum number of records to try to get each time we fetch 
records from a AWS Kinesis shard.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">3</td>
         <td>Integer</td>
         <td>The maximum number of <code>getRecords</code> attempts if we get a 
recoverable exception.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">300</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between 
<code>getRecords</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between 
<code>getRecords</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>getRecords</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getrecords.intervalmillis</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">200</td>
         <td>Long</td>
         <td>The interval (in milliseconds) between each 
<code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getiterator.maxretries</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">3</td>
         <td>Integer</td>
         <td>The maximum number of <code>getShardIterator</code> attempts if we 
get ProvisionedThroughputExceededException.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getiterator.backoff.base</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">300</td>
         <td>Long</td>
         <td>The base backoff time (in milliseconds) between 
<code>getShardIterator</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getiterator.backoff.max</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1000</td>
         <td>Long</td>
         <td>The maximum backoff time (in milliseconds) between 
<code>getShardIterator</code> attempts if we get a 
ProvisionedThroughputExceededException.</td>
       </tr>
       <tr>
         <td><h5>source.shard.getiterator.backoff.expconst</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">1.5</td>
         <td>Double</td>
         <td>The power constant for exponential backoff between each 
<code>getShardIterator</code> attempt.</td>
       </tr>
       <tr>
         <td><h5>source.shard.discovery.intervalmillis</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10000</td>
         <td>Integer</td>
         <td>The interval between each attempt to discover new shards.</td>
       </tr>
       <tr>
         <td><h5>source.shard.adaptivereads</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">false</td>
         <td>Boolean</td>
         <td>The config to turn on adaptive reads from a shard. See the 
<code>AdaptivePollingRecordPublisher</code> documentation for details.</td>
       </tr>
       <tr>
         <td><h5>source.shard.idle.interval</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">-1</td>
         <td>Long</td>
         <td>The interval (in milliseconds) after which to consider a shard 
idle for purposes of watermark generation. A positive value will allow the 
watermark to progress even when some shards don't receive new records.</td>
       </tr>
       <tr>
         <td><h5>source.watermark.sync.interval</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">30000</td>
         <td>Long</td>
         <td>The interval (in milliseconds) for periodically synchronizing the 
shared watermark state.</td>
       </tr>
       <tr>
         <td><h5>source.watermark.lookahead.millis</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">0</td>
         <td>Long</td>
         <td>The maximum delta (in milliseconds) allowed for the reader to 
advance ahead of the shared global watermark.</td>
       </tr>
       <tr>
         <td><h5>source.watermark.sync.queue.capacity</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">100</td>
         <td>Integer</td>
         <td>The maximum number of records that will be buffered before 
suspending consumption of a shard.</td>
       </tr>
       <tr>
         <td><h5>source.stream.efo.http-client.max-concurrency</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">10000</td>
         <td>Integer</td>
         <td>Maximum number of allowed concurrent requests for the EFO 
client.</td>
       </tr>
       <tr>
         <td><h5>sink.*</h5></td>
         <td>optional</td>
         <td style="word-wrap: break-word;">(none)</td>
         <td></td>
         <td>
           Sink options for the <code>KinesisProducer</code>. 
           Passed after stripping the <code>sink.*</code> to <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html#fromProperties-java.util.Properties-";>"KinesisProducerConfiguration#fromProperties</a>.
           Suffix names must match the <a 
href="https://javadoc.io/static/com.amazonaws/amazon-kinesis-producer/0.14.0/com/amazonaws/services/kinesis/producer/KinesisProducerConfiguration.html";>KinesisProducerConfiguration</a>
 getters minus the <code>get</code> prefix (for example, 
<code>CollectionMaxCount</code> or <code>AggregationMaxCount</code>).
           Note that some of the KCL defaults are overwritten by 
<code>KinesisConfigUtil</code>.
         </td>
       </tr>
       </tbody>
   </table>
   
   Instead of duplicating code in `KinesisOptions`, I suggest to use the 
following rules when constructing the `Properties` map passed downstream:
   
   * Option keys starting with `source.*` are replaced by the corresponding 
`flink.*` property. Available keys are listed in `ConsumerConfigConstants`.
   * Option keys starting with `aws.*` are kept intact. Available keys are 
listed in `AWSConfigConstants`.
   * Option keys starting with `sink.*` are stripped from the `sink.` prefix 
and passed directly to the `FlinkKinesisProducer` constructor (this is in line 
with the discussion in 
[FLINK-7367](https://issues.apache.org/jira/browse/FLINK-7367)).
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to