MartijnVisser commented on a change in pull request #18165: URL: https://github.com/apache/flink/pull/18165#discussion_r773875231
########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -29,9 +29,26 @@ under the License. The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). -To use the connector, add the following Maven dependency to your project: - -{{< artifact flink-connector-kinesis >}} +To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: + +<table class="table table-bordered"> Review comment: I haven't checked it myself, but have you checked if the rendered layout works as expected in a table? ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -29,9 +29,26 @@ under the License. The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). -To use the connector, add the following Maven dependency to your project: - -{{< artifact flink-connector-kinesis >}} +To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: + +<table class="table table-bordered"> + <thead> + <tr> + <th class="text-left">KDS Connectivity</th> + <th class="text-left">Maven Dependency</th> + </tr> + </thead> + <tbody> + <tr> + <td>Consumer</td> Review comment: ```suggestion <td>Source</td> ``` ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -29,9 +29,26 @@ under the License. The Kinesis connector provides access to [Amazon AWS Kinesis Streams](http://aws.amazon.com/kinesis/streams/). -To use the connector, add the following Maven dependency to your project: - -{{< artifact flink-connector-kinesis >}} +To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams: Review comment: I'm a bit surprised that there will be two separate dependencies for a source and a sink, is that temporarily or permanent? I don't see that for any other connector. ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. Review comment: ```suggestion To write data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. ``` ########## File path: docs/content/docs/ops/metrics.md ########## @@ -1339,7 +1339,7 @@ Certain RocksDB native metrics are available but disabled by default, you can fi #### Kafka Connectors Please refer to [Kafka monitoring]({{< ref "docs/connectors/datastream/kafka" >}}/#monitoring). -#### Kinesis Connectors +#### Kinesis Consumer Review comment: ```suggestion #### Kinesis Source ``` ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance -Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example. +The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by flushing the entire contents of the buffer when a checkpoint reaches the sink. This effectively assures all requests that were triggered before the checkpoint have been successfully acknowledged by Kinesis Data Streams, before proceeding to process more records sent to the sink. -### Backpressure +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. Also, the sink does not guarantee that records are written in order to the shards. -By default, `FlinkKinesisProducer` does not backpressure. Instead, records that -cannot be sent because of the rate restriction of 1 MB per second per shard are -buffered in an unbounded queue and dropped when their `RecordTtl` expires. +To use fault tolerant KDS Sinks, checkpointing of the topology needs to be enabled at the execution environment. -To avoid data loss, you can enable backpressuring by restricting the size of the -internal queue: +### Backpressure +Backpressure in the sink arises as the sink buffer fills up and writes to the sink +begins to exhibit blocking behaviour. Kinesis Data Streams has a rate restriction of +1 MB per second per shard. + +You can ease backpressuring by adjusting the size of the internal queue: ``` -// 200 Bytes per record, 1 shard -kinesis.setQueueLimit(500); +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + ... + .setMaxBufferedRequests(10_000) + ... ``` -The value for `queueLimit` depends on the expected record size. To choose a good -value, consider that Kinesis is rate-limited to 1MB per second per shard. If -less than one second's worth of records is buffered, then the queue may not be -able to operate at full capacity. With the default `RecordMaxBufferedTime` of -100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` -can then be computed via +The sink default maximum record size is 1MB and maximum batch size is 5MB in line with the Kinesis Data Streams maximums. -``` -queue limit = (number of shards * queue size per shard) / record size -``` +## Kinesis Producer -E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good -starting point. If the queue size limits throughput (below 1MB per second per -shard), try increasing the queue limit slightly. +{{< hint warning >}} +`flink-connector-kinesis` is deprecated and may be removed with a future release of Flink, please use [Kinesis Data Streams Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead. +{{< /hint >}} +For older references you can look at the Flink 1.14 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kinesis/#kinesis-producer">documentation</a>. Review comment: We don't link to older version of the documentation since the user can switch himself between different versions. ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance -Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example. +The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by flushing the entire contents of the buffer when a checkpoint reaches the sink. This effectively assures all requests that were triggered before the checkpoint have been successfully acknowledged by Kinesis Data Streams, before proceeding to process more records sent to the sink. -### Backpressure +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. Also, the sink does not guarantee that records are written in order to the shards. -By default, `FlinkKinesisProducer` does not backpressure. Instead, records that -cannot be sent because of the rate restriction of 1 MB per second per shard are -buffered in an unbounded queue and dropped when their `RecordTtl` expires. +To use fault tolerant KDS Sinks, checkpointing of the topology needs to be enabled at the execution environment. -To avoid data loss, you can enable backpressuring by restricting the size of the -internal queue: +### Backpressure +Backpressure in the sink arises as the sink buffer fills up and writes to the sink +begins to exhibit blocking behaviour. Kinesis Data Streams has a rate restriction of +1 MB per second per shard. + +You can ease backpressuring by adjusting the size of the internal queue: Review comment: ```suggestion You can ease backpressure by adjusting the size of the internal queue: ``` ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance -Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example. +The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by flushing the entire contents of the buffer when a checkpoint reaches the sink. This effectively assures all requests that were triggered before the checkpoint have been successfully acknowledged by Kinesis Data Streams, before proceeding to process more records sent to the sink. -### Backpressure +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. Also, the sink does not guarantee that records are written in order to the shards. -By default, `FlinkKinesisProducer` does not backpressure. Instead, records that -cannot be sent because of the rate restriction of 1 MB per second per shard are -buffered in an unbounded queue and dropped when their `RecordTtl` expires. +To use fault tolerant KDS Sinks, checkpointing of the topology needs to be enabled at the execution environment. -To avoid data loss, you can enable backpressuring by restricting the size of the -internal queue: +### Backpressure +Backpressure in the sink arises as the sink buffer fills up and writes to the sink +begins to exhibit blocking behaviour. Kinesis Data Streams has a rate restriction of +1 MB per second per shard. + +You can ease backpressuring by adjusting the size of the internal queue: ``` -// 200 Bytes per record, 1 shard -kinesis.setQueueLimit(500); +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + ... + .setMaxBufferedRequests(10_000) + ... ``` -The value for `queueLimit` depends on the expected record size. To choose a good -value, consider that Kinesis is rate-limited to 1MB per second per shard. If -less than one second's worth of records is buffered, then the queue may not be -able to operate at full capacity. With the default `RecordMaxBufferedTime` of -100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` -can then be computed via +The sink default maximum record size is 1MB and maximum batch size is 5MB in line with the Kinesis Data Streams maximums. -``` -queue limit = (number of shards * queue size per shard) / record size -``` +## Kinesis Producer -E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good -starting point. If the queue size limits throughput (below 1MB per second per -shard), try increasing the queue limit slightly. +{{< hint warning >}} +`flink-connector-kinesis` is deprecated and may be removed with a future release of Flink, please use [Kinesis Data Streams Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead. +{{< /hint >}} +For older references you can look at the Flink 1.14 <a href="https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kinesis/#kinesis-producer">documentation</a>. ## Using Custom Kinesis Endpoints -It is sometimes desirable to have Flink operate as a consumer or producer against a Kinesis VPC endpoint or a non-AWS +It is sometimes desirable to have Flink operate as a consumer or sink against a Kinesis VPC endpoint or a non-AWS Review comment: ```suggestion It is sometimes desirable to have Flink operate as a source or sink against a Kinesis VPC endpoint or a non-AWS ``` ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance -Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example. +The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by flushing the entire contents of the buffer when a checkpoint reaches the sink. This effectively assures all requests that were triggered before the checkpoint have been successfully acknowledged by Kinesis Data Streams, before proceeding to process more records sent to the sink. -### Backpressure +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. Also, the sink does not guarantee that records are written in order to the shards. -By default, `FlinkKinesisProducer` does not backpressure. Instead, records that -cannot be sent because of the rate restriction of 1 MB per second per shard are -buffered in an unbounded queue and dropped when their `RecordTtl` expires. +To use fault tolerant KDS Sinks, checkpointing of the topology needs to be enabled at the execution environment. -To avoid data loss, you can enable backpressuring by restricting the size of the -internal queue: +### Backpressure +Backpressure in the sink arises as the sink buffer fills up and writes to the sink +begins to exhibit blocking behaviour. Kinesis Data Streams has a rate restriction of +1 MB per second per shard. + +You can ease backpressuring by adjusting the size of the internal queue: ``` -// 200 Bytes per record, 1 shard -kinesis.setQueueLimit(500); +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + ... + .setMaxBufferedRequests(10_000) + ... ``` -The value for `queueLimit` depends on the expected record size. To choose a good -value, consider that Kinesis is rate-limited to 1MB per second per shard. If -less than one second's worth of records is buffered, then the queue may not be -able to operate at full capacity. With the default `RecordMaxBufferedTime` of -100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` -can then be computed via +The sink default maximum record size is 1MB and maximum batch size is 5MB in line with the Kinesis Data Streams maximums. -``` -queue limit = (number of shards * queue size per shard) / record size -``` +## Kinesis Producer -E.g. for 200Bytes per record and 8 shards, a queue limit of 4000 is a good -starting point. If the queue size limits throughput (below 1MB per second per -shard), try increasing the queue limit slightly. +{{< hint warning >}} +`flink-connector-kinesis` is deprecated and may be removed with a future release of Flink, please use [Kinesis Data Streams Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-data-streams-sink">}}) instead. Review comment: Double checking: have we already annotated it with the deprecation annotation? ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance Review comment: For consistency reasons, I think it's worthwhile to make a choice on how to refer to the Kinesis Sink. I see `Kinesis Data Streams Sink`, `KDS Sinks` etc all being used. I can imagine that somewhere in the top you once write that we're talking about `Kinesis Data Streams Sink` for which we'll use Kinesis Sink onwards. That's also consistent how we do it for other connectors ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. -To put data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the AWS dashboard. +To put data into a Kinesis stream, make sure the stream is marked as “ACTIVE” in the AWS dashboard. For the monitoring to work, the user accessing the stream needs access to the CloudWatch service. {{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}} {{< tab "Java" >}} ```java -Properties producerConfig = new Properties(); -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); -// Optional configs -producerConfig.put("AggregationMaxCount", "4294967295"); -producerConfig.put("CollectionMaxCount", "1000"); -producerConfig.put("RecordTtl", "30000"); -producerConfig.put("RequestTimeout", "6000"); -producerConfig.put("ThreadPoolSize", "15"); - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false"); -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST"); - -FlinkKinesisProducer<String> kinesis = new FlinkKinesisProducer<>(new SimpleStringSchema(), producerConfig); -kinesis.setFailOnError(true); -kinesis.setDefaultStream("kinesis_stream_name"); -kinesis.setDefaultPartition("0"); +ElementConverter<String, PutRecordsRequestEntry> elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build(); + +Properties sinkProperties = new Properties(); +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1"); +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); + +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10_000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build(); DataStream<String> simpleStringStream = ...; -simpleStringStream.addSink(kinesis); +simpleStringStream.sinkTo(kdsSink); ``` {{< /tab >}} {{< tab "Scala" >}} ```scala -val producerConfig = new Properties() -// Required configs -producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") -producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") -producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") -// Optional KPL configs -producerConfig.put("AggregationMaxCount", "4294967295") -producerConfig.put("CollectionMaxCount", "1000") -producerConfig.put("RecordTtl", "30000") -producerConfig.put("RequestTimeout", "6000") -producerConfig.put("ThreadPoolSize", "15") - -// Disable Aggregation if it's not supported by a consumer -// producerConfig.put("AggregationEnabled", "false") -// Switch KinesisProducer's threading model -// producerConfig.put("ThreadingModel", "PER_REQUEST") - -val kinesis = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig) -kinesis.setFailOnError(true) -kinesis.setDefaultStream("kinesis_stream_name") -kinesis.setDefaultPartition("0") +val elementConverter = + KinesisDataStreamsSinkElementConverter.<String>builder() + .setSerializationSchema(new SimpleStringSchema()) + .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) + .build() + +val sinkProperties = new Properties() +// Required +sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1") +sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") +sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") + +val kdsSink = KinesisDataStreamsSink.<String>builder() + .setKinesisClientProperties(sinkProperties) // Required + .setElementConverter(elementConverter) // Required + .setStreamName("your-stream-name") // Required + .setFailOnError(false) // Optional + .setMaxBatchSize(500) // Optional + .setMaxInFlightRequests(16) // Optional + .setMaxBufferedRequests(10000) // Optional + .setMaxBatchSizeInBytes(5 * 1024 * 1024) // Optional + .setMaxTimeInBufferMS(5000) // Optional + .setMaxRecordSizeInBytes(1 * 1024 * 1024) // Optional + .build() val simpleStringStream = ... -simpleStringStream.addSink(kinesis) +simpleStringStream.sinkTo(kdsSink) ``` {{< /tab >}} {{< /tabs >}} -The above is a simple example of using the producer. To initialize `FlinkKinesisProducer`, users are required to pass in `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` via a `java.util.Properties` instance. Users can also pass in KPL's configurations as optional parameters to customize the KPL underlying `FlinkKinesisProducer`. The full list of KPL configs and explanations can be found [here](https://github.com/awslabs/amazon-kinesis-producer/blob/master/java/amazon-kinesis-producer-sample/default_config.properties). The example demonstrates producing a single Kinesis stream in the AWS region "us-east-1". +The above is a simple example of using the Kinesis Data Streams sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. -If users don't specify any KPL configs and values, `FlinkKinesisProducer` will use default config values of KPL, except `RateLimit`. `RateLimit` limits the maximum allowed put rate for a shard, as a percentage of the backend limits. KPL's default value is 150 but it makes KPL throw `RateLimitExceededException` too frequently and breaks Flink sink as a result. Thus `FlinkKinesisProducer` overrides KPL's default value to 100. +You will always need to supply a `KinesisDataStreamsSinkElementConverter` during sink creation. This is where you specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record. -Instead of a `SerializationSchema`, it also supports a `KinesisSerializationSchema`. The `KinesisSerializationSchema` allows to send the data to multiple streams. This is -done using the `KinesisSerializationSchema.getTargetStream(T element)` method. Returning `null` there will instruct the producer to write the element to the default stream. -Otherwise, the returned stream name is used. +Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry. -### Threading Model +The KDS Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}). -Since Flink 1.4.0, `FlinkKinesisProducer` switches its default underlying KPL from a one-thread-per-request mode to a thread-pool mode. KPL in thread-pool mode uses a queue and thread pool to execute requests to Kinesis. This limits the number of threads that KPL's native process may create, and therefore greatly lowers CPU utilization and improves efficiency. **Thus, We highly recommend Flink users use thread-pool model.** The default thread pool size is `10`. Users can set the pool size in `java.util.Properties` instance with key `ThreadPoolSize`, as shown in the above example. +### KDS Sinks and Fault Tolerance -Users can still switch back to one-thread-per-request mode by setting a key-value pair of `ThreadingModel` and `PER_REQUEST` in `java.util.Properties`, as shown in the code commented out in above example. +The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by flushing the entire contents of the buffer when a checkpoint reaches the sink. This effectively assures all requests that were triggered before the checkpoint have been successfully acknowledged by Kinesis Data Streams, before proceeding to process more records sent to the sink. -### Backpressure +In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. Also, the sink does not guarantee that records are written in order to the shards. -By default, `FlinkKinesisProducer` does not backpressure. Instead, records that -cannot be sent because of the rate restriction of 1 MB per second per shard are -buffered in an unbounded queue and dropped when their `RecordTtl` expires. +To use fault tolerant KDS Sinks, checkpointing of the topology needs to be enabled at the execution environment. -To avoid data loss, you can enable backpressuring by restricting the size of the -internal queue: +### Backpressure +Backpressure in the sink arises as the sink buffer fills up and writes to the sink +begins to exhibit blocking behaviour. Kinesis Data Streams has a rate restriction of +1 MB per second per shard. + +You can ease backpressuring by adjusting the size of the internal queue: ``` -// 200 Bytes per record, 1 shard -kinesis.setQueueLimit(500); +KinesisDataStreamsSink<String> kdsSink = + KinesisDataStreamsSink.<String>builder() + ... + .setMaxBufferedRequests(10_000) + ... ``` -The value for `queueLimit` depends on the expected record size. To choose a good -value, consider that Kinesis is rate-limited to 1MB per second per shard. If -less than one second's worth of records is buffered, then the queue may not be -able to operate at full capacity. With the default `RecordMaxBufferedTime` of -100ms, a queue size of 100kB per shard should be sufficient. The `queueLimit` -can then be computed via +The sink default maximum record size is 1MB and maximum batch size is 5MB in line with the Kinesis Data Streams maximums. Review comment: I would keep this line and remove the reference to the default size at line 679-680 ########## File path: docs/content/docs/connectors/datastream/kinesis.md ########## @@ -566,124 +583,124 @@ Retry and backoff parameters can be configured using the `ConsumerConfigConstant this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured. Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys. -## Kinesis Producer - -The `FlinkKinesisProducer` uses [Kinesis Producer Library (KPL)](http://docs.aws.amazon.com/streams/latest/dev/developing-producers-with-kpl.html) to put data from a Flink stream into a Kinesis stream. - -Note that the producer is not participating in Flink's checkpointing and doesn't provide exactly-once processing guarantees. Also, the Kinesis producer does not guarantee that records are written in order to the shards (See [here](https://github.com/awslabs/amazon-kinesis-producer/issues/23) and [here](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#API_PutRecord_RequestSyntax) for more details). +## Kinesis Data Streams Sink -In case of a failure or a resharding, data will be written again to Kinesis, leading to duplicates. This behavior is usually called "at-least-once" semantics. +The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to put data from a Flink stream into a Kinesis stream. Review comment: ```suggestion The Kinesis Data Streams sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream. ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
