This is an automated email from the ASF dual-hosted git repository.
hong pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push:
new f3303c44b Add blogpost for new KinesisStreamsSource and
DynamoDbStreamsSource
f3303c44b is described below
commit f3303c44b8279bd04db7c0e57a77a84d97bc025d
Author: Hong Teoh <[email protected]>
AuthorDate: Thu Nov 21 18:34:02 2024 +0000
Add blogpost for new KinesisStreamsSource and DynamoDbStreamsSource
---
.../2024-11-25-whats-new-aws-connectors-5.0.0.md | 212 +++++++++++++++++++++
.../kinesis_records_sharding.png | Bin 0 -> 242869 bytes
.../kinesis_resharding.png | Bin 0 -> 124755 bytes
3 files changed, 212 insertions(+)
diff --git a/docs/content/posts/2024-11-25-whats-new-aws-connectors-5.0.0.md
b/docs/content/posts/2024-11-25-whats-new-aws-connectors-5.0.0.md
new file mode 100644
index 000000000..ec6ea96a9
--- /dev/null
+++ b/docs/content/posts/2024-11-25-whats-new-aws-connectors-5.0.0.md
@@ -0,0 +1,212 @@
+---
+title: "Introducing the new Amazon Kinesis Data Stream and Amazon DynamoDB
Stream sources"
+date: "2024-11-25T18:00:00.000Z"
+authors:
+- hong:
+ name: "Hong Liang Teoh"
+---
+
+
+We are pleased to introduce updated versions of the Amazon Kinesis Data Stream
and Amazon DynamoDB Stream sources. Built on the [FLIP-27 source
interface](https://cwiki.apache.org/confluence/display/FLINK/FLIP-27%3A+Refactor+Source+Interface),
these newer connectors introduce 7 new features and are compatible with Flink
2.0.
+
+The new
[`KinesisStreamsSource`](https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java)
replaces the legacy
[`FlinkKinesisConsumer`](https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumer.java);
and the new [`DynamoDbStreamsSo [...]
+
+In this blogpost, we will dive into the motivation for the new source
connectors, the improvements introduced, and provide migration guidance for
users.
+
+## Dependencies
+
+<table>
+ <tr>
+ <th>Connector</th>
+ <th>API</th>
+ <th>Dependency</th>
+ <th>Usage</th>
+ </tr>
+ <tr>
+ <td>Amazon Kinesis Data Streams source</td>
+ <td>DataStream<br>Table API</td>
+ <td> Use the <code>flink-connector-aws-kinesis-streams</code> artifact.
See <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/">
+ Flink Kinesis connector documentation</a> for details.
+ </td>
+ <td>
+ Use the fluent
+ <a
href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSourceBuilder.java">
+ KinesisStreamsSourceBuilder</a> to create the source. Look at the <a
href="#example-migrating-flinkkinesisconsumer-to-kinesisstreamssource">migration
guidance section</a> for more details.
+ </td>
+ </tr>
+ <tr>
+ <td>Amazon Kinesis Data Streams source</td>
+ <td>SQL</td>
+ <td> Use the <code>flink-sql-connector-aws-kinesis-streams</code>
artifact. See <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kinesis/">
+ Flink SQL Kinesis connector documentation</a> for details.
+ </td>
+ <td>
+ Use the table identifier <code>kinesis</code>. See the
+ <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/table/kinesis/">
+ Flink SQL Kinesis connector documentation</a> for configuration and
usage details.
+ </td>
+ </tr>
+ <tr>
+ <td>Amazon DynamoDB Streams source</td>
+ <td>DataStream</td>
+ <td> Use the <code>flink-connector-dynamodb</code> artifact. See <a
href="https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/">
+ Flink DynamoDB connector documentation</a> for details.
+ </td>
+ <td>
+ Use the fluent
+ <a
href="https://github.com/apache/flink-connector-aws/blob/v5.0/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/source/DynamoDbStreamsSourceBuilder.java">
+ DynamoDbStreamsSourceBuilder</a> to create the source. Look at the <a
href="#example-migrating-flinkdynamodbstreamsconsumer-to-dynamodbstreamssource">migration
guidance section</a> for more details.
+ </td>
+ </tr>
+</table>
+
+
+## Why did we need new source connectors?
+
+We implemented new source connectors because the `FlinkKinesisConsumer` and
`FlinkDynamoDBStreamsConsumer` use the deprecated `SourceFunction` interface,
which is removed in Flink 2.x. From Flink 2.x onwards, only
`KinesisStreamsSource` and `DynamoDbStreamsSource`, which use the new `Source`
interface will be supported.
+
+In addition, the new `Source` interface introduces new features and
standardisation across various Flink sources, such as [unified
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics),
[native watermark
handling](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/event-time/generating_watermarks/),
and support for coordination via a `SourceEnumerator` component running on the
JobManager.
+
+## New features
+
+The updated `KinesisStreamsSource` and `DynamoDbStreamsSource` connectors
offer the following new features:
+
+1. **Native Flink watermark integration.** On the new `Source` interface,
watermark generation is abstracted away to the Flink framework, and no longer a
responsibility of the source. This means the new source has support for
watermark alignment, and idle watermark handling out-of-the-box.
+2. **Standardised Flink Source metrics.** The new `Source` framework also
introduces [standardised Source
metrics](https://cwiki.apache.org/confluence/display/FLINK/FLIP-33%3A+Standardize+Connector+Metrics).
This enables users to track record throughput and lag across sources in a
standardised manner.
+3. **Records are read in-order even after a resharding operation on the
stream.** The new `Source` ensures that parent shards are read completely
before reading children shards. This allows record ordering to be maintained
even after a resharding operation. See [explanation of record ordering in
Kinesis Data Streams](#appendix-detailed-explanation-of-record-ordering) for
more information.
+4. **Migrate away from AWS SDK v1 to AWS SDK v2.** This SDK update aligns with
best practices.
+5. **Migrate away from custom retry strategies to use the AWS SDK native retry
strategies.** This allows us to benefit from AWS error classification in the
retry algorithm.
+6. **Reduce jar size by >99%, from
[~60MB](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kinesis/5.0.0-1.20)
to
[~200KB](https://mvnrepository.com/artifact/org.apache.flink/flink-connector-aws-kinesis-streams/5.0.0-1.20).**
In the new source, we no longer shade the AWS SDK and no longer need to
package the Kinesis Producer Library (needed for legacy sink). This will lead
to smaller Flink application jars. Note that users will still need to shade the
AWS SDK into the [...]
+7. **Improve defaults.** The `UniformShardAssigner` is now the default shard
assigner. This change results in a uniform shard distribution across Flink
subtasks and can reduce unexpected processing skew.
+
+
+## Breaking changes
+
+During the implementation of the source Table API, we had to introduce some
breaking changes around the table identifier of `kinesis`. This necessitated a
major version bump from `4.x` to `5.x`. In Table API / SQL, for version `4.x`
and below, `kinesis` refers to the old `FlinkKinesisConsumer`. However, from
`5.x` onwards, `kinesis` now refers to the new `KinesisStreamSource`. To use
the old `FlinkKinesisConsumer` with `5.x`, you can use the table identifier of
`kinesis-legacy`. See [`K [...]
+
+## Migration guidance
+
+There is no state compatibility between the legacy sources
(`FlinkKinesisConsumer` and `FlinkDynamoDBStreamsConsumer`), and the new
sources (`KinesisStreamsSource` and `DynamoDbStreamsSource`). This means that
in order to migrate from the legacy source to the new source, users must drop
the state of the source operator and start from a specified starting position,
to prevent any data loss.
+
+### Example migrating `FlinkKinesisConsumer` to `KinesisStreamsSource`
+
+Here we show a simple example to migrate from `FlinkKinesisConsumer` to
`KinesisStreamsSource`. See [Flink Kinesis connector
documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/)
for more details.
+
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Old FlinkKinesisConsumer to read from stream test-stream from TRIM_HORIZON
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
+FlinkKinesisConsumer<String> oldKinesisConsumer =
+ new FlinkKinesisConsumer<>("test-stream", new SimpleStringSchema(),
consumerConfig);
+DataStream<String> kinesisRecordsFromOldKinesisConsumer =
env.addSource(oldKinesisConsumer)
+ .uid("custom-uid")
+ .assignTimestampsAndWatermarks(
+
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)))
+
+// New KinesisStreamsSource to read from stream test-stream from TRIM_HORIZON
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION,
KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON);
+KinesisStreamsSource<String> newKdsSource =
+ KinesisStreamsSource.<String>builder()
+
.setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
+ .setSourceConfig(sourceConfig)
+ .setDeserializationSchema(new SimpleStringSchema())
+ .build();
+DataStream<String> kinesisRecordsWithEventTimeWatermarks = env.fromSource(
+ kdsSource,
+
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
+ "Kinesis source")
+ .returns(TypeInformation.of(String.class))
+ .uid("custom-uid");
+```
+
+### Example migrating `FlinkDynamoDBStreamsConsumer` to `DynamoDbStreamsSource`
+
+Here we show a simple example to migrate from `FlinkDynamoDBStreamsConsumer`
to `DynamoDbStreamsSource`. See [Flink DynamoDB connector
documentation](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/)
for more details.
+
+```java
+StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Old FlinkDynamoDBStreamsConsumer to read from stream test stream from
TRIM_HORIZON
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION,
"TRIM_HORIZON");
+FlinkDynamoDBStreamsConsumer<String> oldDynamodbStreamsConsumer =
+ new
FlinkDynamoDBStreamsConsumer<>("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380",
new SimpleStringSchema(), consumerConfig);
+DataStream<String> dynamodbRecordsFromOldDynamodbStreamsConsumer =
env.addSource(oldDynamodbStreamsConsumer)
+ .uid("custom-uid")
+ .assignTimestampsAndWatermarks(
+
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)))
+
+// New DynamoDbStreamsSource to read from stream test stream from TRIM_HORIZON
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION,
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON);
+KinesisStreamsSource<String> newDynamoDbStreamsSource =
+ DynamoDbStreamsSource.<String>builder()
+
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+ .setSourceConfig(sourceConfig)
+ // User must implement their own deserialization schema to translate
change data capture events into custom data types
+ .setDeserializationSchema(dynamodbDeserializationSchema)
+ .build();
+DataStream<String> dynamodbRecordsWithEventTimeWatermarks = env.fromSource(
+ newDynamoDbStreamsSource,
+
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
+ "DynamoDB Streams source")
+ .returns(TypeInformation.of(String.class))
+ .uid("custom-uid");
+```
+
+## Summary
+In this blog, we have covered the motivation behind creating the new
`KinesisStreamsSource` and `DynamoDbStreamsSource` connectors, highlighting
their new features and migration guidance. Feel free to reach out on the Flink
mailing list ([[email protected]](mailto:[email protected])) or Flink
Slack to discuss any further improvements.
+
+To get started with the connectors, follow one the guides below!
+
+* [**Amazon Kinesis Data Streams
Source**](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/#kinesis-streams-source)
+* [**Amazon DynamoDB Streams
Source**](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/#amazon-dynamodb-streams-source)
+
+
+## Further work
+Support for the below additional features are also in progress:
+* [Support for Table API and SQL for the DynamoDB Streams
Source.](https://issues.apache.org/jira/browse/FLINK-34340)
+* [Datastream Python
integration](https://issues.apache.org/jira/browse/FLINK-31988) for both
sources are not implemented yet, but we recognize their importance to users.
+
+
+## List of Contributors
+Abhi Gupta, Aleksandr Pilipenko, Burak Ozakinci, Elphas Toringepi, Lorenzo
Nicora, Danny Cranmer, Hong Teoh
+
+
+## Appendix: Detailed explanation of record ordering
+
+As a scalable streaming data store, the data records belonging to a Kinesis
Data Stream are segregated across multiple shards, based on the partition key
specified when writing the data record. Record ordering is maintained only
within the same shard. Since records with the same partition key are written to
the same shard, record ordering is maintained for a given partition key.
+
+The situation gets complicated when the stream is resharded to scale the
stream read/write capacity. During the resharding of a stream, shards go
through split and merge operations. A split operation splits one parent shard
into two smaller child shards. A merge operation merges two parent shards into
one larger child shard.
+
+An example of resharding a stream from 2 open shards to 3 open shards is shown
below.
+
+<center>
+<br/>
+<img
src="/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_resharding.png"
width="60%"/>
+<br/>
+Fig. 1 - Illustration of uniform resharding event in Kinesis Data Stream from
2 shards to 3 shards.
+</center>
+
+In the diagram, the stream goes from having 2 open shards (0, 1) to having 3
open shards (2, 5, 6) and 4 closed shards (0, 1, 3, 4). Closed shards can
contain records, but will no longer receive any new records, whereas open
shards can still receive new records. The reason for multiple split/merge
operations during this resharding is to prevent record skew across shards.
+
+The diagram below illustrates what could happen when records with a given
ordering within the same partition key are written to the stream.
+
+<center>
+<br/>
+<img
src="/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_records_sharding.png"
width="60%"/>
+<br/>
+Fig. 2 - Illustration of record distribution within a Kinesis Data Stream
after a resharding operation.
+</center>
+
+As we can see, to ensure that records from the `pk2` are read in order, we
need to ensure that the shards are read in order of `Shard 0`, then `Shard 3`,
then `Shard 6`. This can be more easily understood as : All parent shards must
be fully read before children shard can be read.
+
+The new `KinesisStreamsSource` ensures that parent shards are read completely
before reading the children shard, and so ensure that record ordering is
maintained even after a resharding operation on the stream.
+
+
+
diff --git
a/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_records_sharding.png
b/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_records_sharding.png
new file mode 100644
index 000000000..ccf2fb8d8
Binary files /dev/null and
b/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_records_sharding.png
differ
diff --git
a/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_resharding.png
b/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_resharding.png
new file mode 100644
index 000000000..40cc058bb
Binary files /dev/null and
b/docs/static/img/blog/2024-11-25-whats-new-aws-connectors/kinesis_resharding.png
differ