hlteoh37 commented on code in PR #179:
URL:
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1833159382
##########
docs/content/docs/connectors/table/kinesis.md:
##########
@@ -123,6 +158,430 @@ WITH (
Connector Options
-----------------
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 8%">Forwarded</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 42%">Description</th>
+ </tr>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>connector</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what connector to use. For Kinesis use
<code>'kinesis'</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>stream.arn</h5></td>
+ <td>required</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Name of the Kinesis data stream backing this table.</td>
+ </tr>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The format used to deserialize and serialize Kinesis data stream
records. See <a href="#data-type-mapping">Data Type Mapping</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.region</h5></td>
+ <td>required</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS region where the stream is defined.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.endpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for Kinesis (derived from the AWS region setting if
not set).</td>
+ </tr>
+ <tr>
+ <td><h5>aws.trust.all.certificates</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>If true accepts all SSL certificates.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Authentication
Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>aws.credentials.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">AUTO</td>
+ <td>String</td>
+ <td>A credentials provider to use when authenticating against the
Kinesis endpoint. See <a href="#authentication">Authentication</a> for
details.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS access key ID to use when setting credentials provider
type to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.basic.secretkey</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS secret key to use when setting credentials provider type
to BASIC.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.path</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile path if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.profile.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Optional configuration for profile name if credential provider
type is set to be PROFILE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.arn</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role ARN to use when credential provider type is set to
ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.sessionName</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The role session name to use when credential provider type is set
to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.externalId</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The external ID to use when credential provider type is set to
ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.stsEndpoint</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The AWS endpoint for STS (derived from the AWS region setting if
not set) to use when credential provider type is set to ASSUME_ROLE.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.role.provider</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The credentials provider that provides credentials for assuming
the role when credential provider type is set to ASSUME_ROLE. Roles can be
nested, so this value can again be set to ASSUME_ROLE</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The absolute path to the web identity token file that should be
used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+ </tr>
+ <tr>
+ <td><h5>aws.credentials.custom.class</h5></td>
+ <td>required only if credential provider is set to CUSTOM</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The full path (in Java package notation) to the user provided
+ class to use if credential provider type is set to be CUSTOM e.g.
org.user_company.auth.CustomAwsCredentialsProvider.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>source.init.position</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">LATEST</td>
+ <td>String</td>
+ <td>Initial position to be used when reading from the table. See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>The initial timestamp to start reading Kinesis stream from (when
<code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.init.timestamp.format</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+ <td>String</td>
+ <td>The date format of initial timestamp to start reading Kinesis stream
from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a
href="#start-reading-position">Start Reading Position</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.discovery.interval</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>The interval between each attempt to discover new shards.</td>
+ </tr>
+ <tr>
+ <td><h5>source.reader.type</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">POLLING</td>
+ <td>String</td>
+ <td>The <code>ReaderType</code> to use for sources.</td>
+ </tr>
+ <tr>
+ <td><h5>source.shard.get-records.max-record-count</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>Only applicable to POLLING <code>ReaderType</code>. The maximum
number of records to try to get each time we fetch records from a AWS Kinesis
shard.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.consumer.name</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. The name of the EFO
consumer to register with KDS.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.lifecycle</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">JOB_MANAGED</td>
+ <td>String</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Determine if the EFO
consumer is managed by the Flink job <code>JOB_MANAGED|SELF_MANAGED</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.subscription.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for EFO
Consumer subscription.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.deregister.timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Timeout for consumer
deregistration. When timeout is reached, code will continue as per normal.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.attempts.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">100</td>
+ <td>Integer</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Maximum number of
attempts for the exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.min</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">2 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Base delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ <tr>
+ <td><h5>source.efo.describe.retry-strategy.delay.max</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">60 s</td>
+ <td>Duration</td>
+ <td>Only applicable to EFO <code>ReaderType</code>. Max delay for the
exponential backoff retry strategy when calling
<code>DescribeStreamConsumer</code>.</td>
+ </tr>
+ </tbody>
+ <thead>
+ <tr>
+ <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>sink.partitioner</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">random or row-based</td>
+ <td>String</td>
+ <td>Optional output partitioning from Flink's partitions into Kinesis
shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.partitioner-field-delimiter</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">|</td>
+ <td>String</td>
+ <td>Optional field delimiter for a fields-based partitioner derived from
a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a>
for details.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.producer.*</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td></td>
+ <td>
+ Deprecated options previously used by the legacy connector.
+ Options with equivalant alternatives in
<code>KinesisStreamsSink</code> are matched
+ to their respective properties. Unsupported options are logged out to
user as warnings.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.max-concurrency</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>Integer</td>
+ <td>
+ Maximum number of allowed concurrent requests by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.read-timeout</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">360000</td>
+ <td>Integer</td>
+ <td>
+ Maximum amount of time in ms for requests to be sent by
<code>KinesisAsyncClient</code>.
+ </td>
+ </tr>
+ <tr>
+ <td><h5>sink.http-client.protocol.version</h5></td>
+ <td>optional</td>
+ <td>no</td>
+ <td style="word-wrap: break-word;">HTTP2</td>
+ <td>String</td>
+ <td>Http version used by Kinesis Client.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.batch.max-size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">500</td>
+ <td>Integer</td>
+ <td>Maximum batch size of elements to be passed to
<code>KinesisAsyncClient</code> to be written downstream.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-inflight</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">16</td>
+ <td>Integer</td>
+ <td>Request threshold for uncompleted requests by
<code>KinesisAsyncClient</code>before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.requests.max-buffered</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">10000</td>
+ <td>String</td>
+ <td>Request buffer threshold for buffered requests by
<code>KinesisAsyncClient</code> before blocking new write requests and applying
backpressure.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.size</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5242880</td>
+ <td>Long</td>
+ <td>Threshold value in bytes for writer buffer in
<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.flush-buffer.timeout</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">5000</td>
+ <td>Long</td>
+ <td>Threshold time in milliseconds for an element to be in a buffer
of<code>KinesisAsyncClient</code> before flushing.</td>
+ </tr>
+ <tr>
+ <td><h5>sink.fail-on-error</h5></td>
+ <td>optional</td>
+ <td>yes</td>
+ <td style="word-wrap: break-word;">false</td>
+ <td>Boolean</td>
+ <td>Flag used for retrying failed requests. If set any request failure
will not be retried and will fail the job.</td>
+ </tr>
+ </tbody>
+</table>
+
+Features
+--------
+
+{{< hint info >}}
+Refer to the [Kinesis Datastream API]({{< ref
"docs/connectors/datastream/kinesis" >}}) documentation for more detailed
description of features.
+{{< /hint >}}
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner`
option allows you to control how records written into a multi-shard
Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index,
so each Flink partition ends up in at most one Kinesis partition (assuming that
no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the
default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g.
`'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be
partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this
behavior (attempting to do this results in a configuration error).
+You can, however, use the `sink.partitioner-field-delimiter` option to set the
delimiter of field values in the concatenated
[PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey)
string (an empty string is also a valid delimiter).
+{{< /hint >}}
+
+# Data Type Mapping
+
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't
have a notion of internal record structure.
Review Comment:
I'm not sure actually!
Let's focus the changes for this PR to the source docs.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]