nicusX commented on code in PR #179:
URL: 
https://github.com/apache/flink-connector-aws/pull/179#discussion_r1831489230


##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -141,7 +268,7 @@ annotations see 
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer
 
 A sample application using a custom `ElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java).
 A sample application using the `DynamoDbBeanElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java).

Review Comment:
   I would show a snippet of the converter's `apply()` method, using something 
more realistic than the linked example.
   
   In the example the input record is a `Long`. In reality it would be a POJO.
   In the example the partitionKey and sortKey are random. In reality they 
would be mapped from fields of the POJO.



##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -23,16 +23,143 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+# Amazon DynamoDB Connector
+The DynamoDB connector allows users to read/write from [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb/).
 
-# Amazon DynamoDB Sink
+As a source, the connector allows users to read change data capture stream 
from DynamoDB tables using [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
 
-The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
-to setup a table.
+As a sink, the connector allows users to write directly to Amazon DynamoDB 
tables using the [BatchWriteItem 
API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html).
 
+
+## Dependency
+
+Apache Flink ships the connector for users to utilize.
 
 To use the connector, add the following Maven dependency to your project:
 
 {{< connector_artifact flink-connector-dynamodb dynamodb >}}
 
+
+## Amazon DynamoDB Streams Source
+
+The DynamoDB streams source reads from [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
+Follow the instructions from the [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 to set up and configure the change data capture stream. 
+
+The actual events streamed to the DynamoDB Stream depends on the 
`StreamViewType` specified on the DynamoDB Stream itself. 
+See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Usage
+
+The `DynamoDbStreamsSource` provides a fluent builder to construct an instance 
of the `DynamoDbStreamsSource`.
+The code snippet below illustrates how to do so.
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}}
+{{< tab "Java" >}}
+```java
+// Configure the DynamodbStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+DynamoDbStreamsSource<String> dynamoDbStreamsSource = 
+        DynamoDbStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+                .setSourceConfig(sourceConfig)
+                // User must implement their own deserialization schema to 
translate change data capture events into custom data types    
+                .setDeserializationSchema(dynamodbDeserializationSchema) 
+                .build();
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> cdcEventsWithEventTimeWatermarks = 
env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// Configure the DynamodbStreamsSource
+val sourceConfig = new Configuration()
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
+  
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+  .setSourceConfig(sourceConfig)
+  // User must implement their own deserialization schema to translate change 
data capture events into custom data types    
+  .setDeserializationSchema(dynamodbDeserializationSchema)
+  .build()
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+  .uid("custom-uid")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the `DynamoDbStreamsSource`.
+- The DynamoDB Stream being read from is specified using the stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams 
Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateCreationDateTime`.
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
+
+### Configuring Starting Position
+
+To specify the starting position of the `DynamodbStreamsSource`, users can set 
the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in 
configuration.
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data is trimmed by DynamoDb after 24 hours).
+
+### Deserialization Schema
+
+The `DynamoDbStreamsSource` provides the 
`DynamoDbStreamsDeserializationSchema<T>` interface to allow users to implement 
their own
+deserialization schema to convert DynamoDB change data capture events into 
custom event types.
+
+The `DynamoDbStreamsDeserializationSchema<T>#deserialize` method takes in an 
instance of `Record` from the DynamoDB model.
+The `Record` can contain different content, depending on the configuration of 
the DynamoDB Stream. See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Shard Assignment Strategy
+
+The `UniformShardAssigner` allocates shards to parallel subtasks of the 
`Source` operator based on which parallel subtask has the lowest number of 
allocated shards.
+The idea is to distribute change data capture events as evenly as possible 
across parallel subtasks.

Review Comment:
   Question: does the connector follow the lineage of the shards? 
   There are implications with the ordering, in particular when you start from 
`TRIM_HORIZON`.
   If the source does not follow the shard lineage, change event may be 
replayed out of order. This is something that should be highlighted
   



##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -23,16 +23,143 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+# Amazon DynamoDB Connector
+The DynamoDB connector allows users to read/write from [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb/).
 
-# Amazon DynamoDB Sink
+As a source, the connector allows users to read change data capture stream 
from DynamoDB tables using [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
 
-The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
-to setup a table.
+As a sink, the connector allows users to write directly to Amazon DynamoDB 
tables using the [BatchWriteItem 
API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html).
 
+
+## Dependency
+
+Apache Flink ships the connector for users to utilize.
 
 To use the connector, add the following Maven dependency to your project:
 
 {{< connector_artifact flink-connector-dynamodb dynamodb >}}
 
+
+## Amazon DynamoDB Streams Source
+
+The DynamoDB streams source reads from [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
+Follow the instructions from the [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 to set up and configure the change data capture stream. 
+
+The actual events streamed to the DynamoDB Stream depends on the 
`StreamViewType` specified on the DynamoDB Stream itself. 
+See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Usage
+
+The `DynamoDbStreamsSource` provides a fluent builder to construct an instance 
of the `DynamoDbStreamsSource`.
+The code snippet below illustrates how to do so.
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}}
+{{< tab "Java" >}}
+```java
+// Configure the DynamodbStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+DynamoDbStreamsSource<String> dynamoDbStreamsSource = 
+        DynamoDbStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+                .setSourceConfig(sourceConfig)
+                // User must implement their own deserialization schema to 
translate change data capture events into custom data types    
+                .setDeserializationSchema(dynamodbDeserializationSchema) 
+                .build();
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> cdcEventsWithEventTimeWatermarks = 
env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// Configure the DynamodbStreamsSource
+val sourceConfig = new Configuration()
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
+  
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+  .setSourceConfig(sourceConfig)
+  // User must implement their own deserialization schema to translate change 
data capture events into custom data types    
+  .setDeserializationSchema(dynamodbDeserializationSchema)
+  .build()
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+  .uid("custom-uid")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the `DynamoDbStreamsSource`.
+- The DynamoDB Stream being read from is specified using the stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams 
Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateCreationDateTime`.
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
+
+### Configuring Starting Position
+
+To specify the starting position of the `DynamodbStreamsSource`, users can set 
the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in 
configuration.
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data is trimmed by DynamoDb after 24 hours).
+
+### Deserialization Schema
+
+The `DynamoDbStreamsSource` provides the 
`DynamoDbStreamsDeserializationSchema<T>` interface to allow users to implement 
their own
+deserialization schema to convert DynamoDB change data capture events into 
custom event types.
+
+The `DynamoDbStreamsDeserializationSchema<T>#deserialize` method takes in an 
instance of `Record` from the DynamoDB model.
+The `Record` can contain different content, depending on the configuration of 
the DynamoDB Stream. See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Shard Assignment Strategy
+
+The `UniformShardAssigner` allocates shards to parallel subtasks of the 
`Source` operator based on which parallel subtask has the lowest number of 
allocated shards.
+The idea is to distribute change data capture events as evenly as possible 
across parallel subtasks.

Review Comment:
   I am also putting here because it's related to ordering that is an important 
subject and we need to be explicit on this in docs.
   
   I assume DDBS shards by record primary key, otherwise it could not guarantee 
any order, but I can't find it in DDB documentation. 
   If that is the case, we should mention the fact because Flink will get 
change events in order per PK (as long as the source is consuming the latest 
changes or also when replaying from TRIM_HORIZON, if the source follows the 
lineage of the shards)



##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -23,16 +23,143 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+# Amazon DynamoDB Connector
+The DynamoDB connector allows users to read/write from [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb/).
 
-# Amazon DynamoDB Sink
+As a source, the connector allows users to read change data capture stream 
from DynamoDB tables using [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
 
-The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
-to setup a table.
+As a sink, the connector allows users to write directly to Amazon DynamoDB 
tables using the [BatchWriteItem 
API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html).
 
+
+## Dependency
+
+Apache Flink ships the connector for users to utilize.
 
 To use the connector, add the following Maven dependency to your project:
 
 {{< connector_artifact flink-connector-dynamodb dynamodb >}}
 
+
+## Amazon DynamoDB Streams Source
+
+The DynamoDB streams source reads from [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
+Follow the instructions from the [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 to set up and configure the change data capture stream. 
+
+The actual events streamed to the DynamoDB Stream depends on the 
`StreamViewType` specified on the DynamoDB Stream itself. 
+See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Usage
+
+The `DynamoDbStreamsSource` provides a fluent builder to construct an instance 
of the `DynamoDbStreamsSource`.
+The code snippet below illustrates how to do so.
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}}
+{{< tab "Java" >}}
+```java
+// Configure the DynamodbStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+DynamoDbStreamsSource<String> dynamoDbStreamsSource = 
+        DynamoDbStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+                .setSourceConfig(sourceConfig)
+                // User must implement their own deserialization schema to 
translate change data capture events into custom data types    
+                .setDeserializationSchema(dynamodbDeserializationSchema) 
+                .build();
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> cdcEventsWithEventTimeWatermarks = 
env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// Configure the DynamodbStreamsSource
+val sourceConfig = new Configuration()
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
+  
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+  .setSourceConfig(sourceConfig)
+  // User must implement their own deserialization schema to translate change 
data capture events into custom data types    
+  .setDeserializationSchema(dynamodbDeserializationSchema)
+  .build()
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+  .uid("custom-uid")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the `DynamoDbStreamsSource`.
+- The DynamoDB Stream being read from is specified using the stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams 
Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateCreationDateTime`.
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
+
+### Configuring Starting Position
+
+To specify the starting position of the `DynamodbStreamsSource`, users can set 
the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in 
configuration.
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data is trimmed by DynamoDb after 24 hours).
+
+### Deserialization Schema
+
+The `DynamoDbStreamsSource` provides the 
`DynamoDbStreamsDeserializationSchema<T>` interface to allow users to implement 
their own
+deserialization schema to convert DynamoDB change data capture events into 
custom event types.
+
+The `DynamoDbStreamsDeserializationSchema<T>#deserialize` method takes in an 
instance of `Record` from the DynamoDB model.
+The `Record` can contain different content, depending on the configuration of 
the DynamoDB Stream. See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Shard Assignment Strategy
+
+The `UniformShardAssigner` allocates shards to parallel subtasks of the 
`Source` operator based on which parallel subtask has the lowest number of 
allocated shards.
+The idea is to distribute change data capture events as evenly as possible 
across parallel subtasks.

Review Comment:
   This is not superclear. I would start with explaining what is the main 
effect of this assigner, then explain how it works and why. Something like this
   
   > The `UniformShardAssigner` allocated the shards of the DynamoDB Stream 
evenly across the parallel subtasks of the source operator. DynamoDB Stream are 
ephemeral and are created and deleted automatically, as needed. 
`UniformShardAssigner` allocates new shards to the subtask with the lowest 
number of allocated shards. 



##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -141,7 +268,7 @@ annotations see 
[here](https://docs.aws.amazon.com/sdk-for-java/latest/developer
 
 A sample application using a custom `ElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkIntoDynamoDb.java).
 A sample application using the `DynamoDbBeanElementConverter` can be found 
[here](https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/test/java/org/apache/flink/connector/dynamodb/sink/examples/SinkDynamoDbBeanIntoDynamoDb.java).

Review Comment:
   How are  partition_key and sort_key mapped also determines whether the sink 
will do upsert or append.
   I we assume the reader knows DDB we can skip this. Otherwise, we should 
explain 



##########
docs/content/docs/connectors/datastream/dynamodb.md:
##########
@@ -23,16 +23,143 @@ KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 -->
+# Amazon DynamoDB Connector
+The DynamoDB connector allows users to read/write from [Amazon 
DynamoDB](https://aws.amazon.com/dynamodb/).
 
-# Amazon DynamoDB Sink
+As a source, the connector allows users to read change data capture stream 
from DynamoDB tables using [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html).
 
-The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) 
using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
 Follow the instructions from the [Amazon DynamoDB Developer 
Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html)
-to setup a table.
+As a sink, the connector allows users to write directly to Amazon DynamoDB 
tables using the [BatchWriteItem 
API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html).
 
+
+## Dependency
+
+Apache Flink ships the connector for users to utilize.
 
 To use the connector, add the following Maven dependency to your project:
 
 {{< connector_artifact flink-connector-dynamodb dynamodb >}}
 
+
+## Amazon DynamoDB Streams Source
+
+The DynamoDB streams source reads from [Amazon DynamoDB 
Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 using the [AWS v2 SDK for 
Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html).
+Follow the instructions from the [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html)
 to set up and configure the change data capture stream. 
+
+The actual events streamed to the DynamoDB Stream depends on the 
`StreamViewType` specified on the DynamoDB Stream itself. 
+See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Usage
+
+The `DynamoDbStreamsSource` provides a fluent builder to construct an instance 
of the `DynamoDbStreamsSource`.
+The code snippet below illustrates how to do so.
+
+{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}}
+{{< tab "Java" >}}
+```java
+// Configure the DynamodbStreamsSource
+Configuration sourceConfig = new Configuration();
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+DynamoDbStreamsSource<String> dynamoDbStreamsSource = 
+        DynamoDbStreamsSource.<String>builder()
+                
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+                .setSourceConfig(sourceConfig)
+                // User must implement their own deserialization schema to 
translate change data capture events into custom data types    
+                .setDeserializationSchema(dynamodbDeserializationSchema) 
+                .build();
+
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+DataStream<String> cdcEventsWithEventTimeWatermarks = 
env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+        .returns(TypeInformation.of(String.class))
+        .uid("custom-uid");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+// Configure the DynamodbStreamsSource
+val sourceConfig = new Configuration()
+sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, 
DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is 
optional, by default connector will read from LATEST
+
+// Create a new DynamoDbStreamsSource to read from the specified DynamoDB 
Stream.
+val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]()
+  
.setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380")
+  .setSourceConfig(sourceConfig)
+  // User must implement their own deserialization schema to translate change 
data capture events into custom data types    
+  .setDeserializationSchema(dynamodbDeserializationSchema)
+  .build()
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+// Specify watermarking strategy and the name of the DynamoDB Streams Source 
operator.
+// Specify return type using TypeInformation.
+// Specify also UID of operator in line with Flink best practice.
+val cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, 
WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)),
 "DynamoDB Streams source")
+  .uid("custom-uid")
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the `DynamoDbStreamsSource`.
+- The DynamoDB Stream being read from is specified using the stream ARN.
+- Configuration for the `Source` is supplied using an instance of Flink's 
`Configuration` class.
+  The configuration keys can be taken from `AWSConfigOptions` (AWS-specific 
configuration) and `DynamodbStreamsSourceConfigConstants` (DynamoDB Streams 
Source configuration).
+- The example specifies the starting position as `TRIM_HORIZON` (see 
[Configuring Starting Position](#configuring-starting-position) for more 
information).
+- The deserialization format is as `SimpleStringSchema` (see [Deserialization 
Schema](#deserialization-schema) for more information).
+- The distribution of shards across subtasks is controlled using the 
`UniformShardAssigner`  (see [Shard Assignment 
Strategy](#shard-assignment-strategy) for more information).
+- The example also specifies an increasing `WatermarkStrategy`, which means 
each record will be tagged with event time specified using 
`approximateCreationDateTime`.
+  Monotonically increasing watermarks will be generated, and subtasks will be 
considered idle if no record is emitted after 1 second.
+
+### Configuring Starting Position
+
+To specify the starting position of the `DynamodbStreamsSource`, users can set 
the `DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION` in 
configuration.
+- `LATEST`: read all shards of the stream starting from the latest record.
+- `TRIM_HORIZON`: read all shards of the stream starting from the earliest 
record possible (data is trimmed by DynamoDb after 24 hours).
+
+### Deserialization Schema
+
+The `DynamoDbStreamsSource` provides the 
`DynamoDbStreamsDeserializationSchema<T>` interface to allow users to implement 
their own
+deserialization schema to convert DynamoDB change data capture events into 
custom event types.
+
+The `DynamoDbStreamsDeserializationSchema<T>#deserialize` method takes in an 
instance of `Record` from the DynamoDB model.
+The `Record` can contain different content, depending on the configuration of 
the DynamoDB Stream. See [AWS 
docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling)
 for more information.
+
+### Shard Assignment Strategy
+
+The `UniformShardAssigner` allocates shards to parallel subtasks of the 
`Source` operator based on which parallel subtask has the lowest number of 
allocated shards.
+The idea is to distribute change data capture events as evenly as possible 
across parallel subtasks.

Review Comment:
   Why is "`Source`" shown as code? In this case you can say "source operator"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to