hlteoh37 commented on code in PR #179: URL: https://github.com/apache/flink-connector-aws/pull/179#discussion_r1832475620
########## docs/content/docs/connectors/datastream/dynamodb.md: ########## @@ -23,16 +23,143 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +# Amazon DynamoDB Connector +The DynamoDB connector allows users to read/write from [Amazon DynamoDB](https://aws.amazon.com/dynamodb/). -# Amazon DynamoDB Sink +As a source, the connector allows users to read change data capture stream from DynamoDB tables using [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). -The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) -to setup a table. +As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the [BatchWriteItem API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html). + +## Dependency + +Apache Flink ships the connector for users to utilize. To use the connector, add the following Maven dependency to your project: {{< connector_artifact flink-connector-dynamodb dynamodb >}} + +## Amazon DynamoDB Streams Source + +The DynamoDB streams source reads from [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). +Follow the instructions from the [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to set up and configure the change data capture stream. + +The actual events streamed to the DynamoDB Stream depends on the `StreamViewType` specified on the DynamoDB Stream itself. +See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Usage + +The `DynamoDbStreamsSource` provides a fluent builder to construct an instance of the `DynamoDbStreamsSource`. +The code snippet below illustrates how to do so. + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}} +{{< tab "Java" >}} +```java +// Configure the DynamodbStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +DynamoDbStreamsSource<String> dynamoDbStreamsSource = + DynamoDbStreamsSource.<String>builder() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build(); + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. Review Comment: Removed also :) ########## docs/content/docs/connectors/datastream/dynamodb.md: ########## @@ -23,16 +23,143 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. --> +# Amazon DynamoDB Connector +The DynamoDB connector allows users to read/write from [Amazon DynamoDB](https://aws.amazon.com/dynamodb/). -# Amazon DynamoDB Sink +As a source, the connector allows users to read change data capture stream from DynamoDB tables using [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html). -The DynamoDB sink writes to [Amazon DynamoDB](https://aws.amazon.com/dynamodb) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). Follow the instructions from the [Amazon DynamoDB Developer Guide](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/getting-started-step-1.html) -to setup a table. +As a sink, the connector allows users to write directly to Amazon DynamoDB tables using the [BatchWriteItem API](https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html). + +## Dependency + +Apache Flink ships the connector for users to utilize. To use the connector, add the following Maven dependency to your project: {{< connector_artifact flink-connector-dynamodb dynamodb >}} + +## Amazon DynamoDB Streams Source + +The DynamoDB streams source reads from [Amazon DynamoDB Streams](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) using the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html). +Follow the instructions from the [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html) to set up and configure the change data capture stream. + +The actual events streamed to the DynamoDB Stream depends on the `StreamViewType` specified on the DynamoDB Stream itself. +See [AWS docs](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.html#Streams.Enabling) for more information. + +### Usage + +The `DynamoDbStreamsSource` provides a fluent builder to construct an instance of the `DynamoDbStreamsSource`. +The code snippet below illustrates how to do so. + +{{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120001" >}} +{{< tab "Java" >}} +```java +// Configure the DynamodbStreamsSource +Configuration sourceConfig = new Configuration(); +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +DynamoDbStreamsSource<String> dynamoDbStreamsSource = + DynamoDbStreamsSource.<String>builder() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build(); + +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. +DataStream<String> cdcEventsWithEventTimeWatermarks = env.fromSource(dynamoDbStreamsSource, WatermarkStrategy.<String>forMonotonousTimestamps().withIdleness(Duration.ofSeconds(1)), "DynamoDB Streams source") + .returns(TypeInformation.of(String.class)) + .uid("custom-uid"); +``` +{{< /tab >}} +{{< tab "Scala" >}} +```scala +// Configure the DynamodbStreamsSource +val sourceConfig = new Configuration() +sourceConfig.set(DynamodbStreamsSourceConfigConstants.STREAM_INITIAL_POSITION, DynamodbStreamsSourceConfigConstants.InitialPosition.TRIM_HORIZON) // This is optional, by default connector will read from LATEST + +// Create a new DynamoDbStreamsSource to read from the specified DynamoDB Stream. +val dynamoDbStreamsSource = DynamoDbStreamsSource.builder[String]() + .setStreamArn("arn:aws:dynamodb:us-east-1:1231231230:table/test/stream/2024-04-11T07:14:19.380") + .setSourceConfig(sourceConfig) + // User must implement their own deserialization schema to translate change data capture events into custom data types + .setDeserializationSchema(dynamodbDeserializationSchema) + .build() + +val env = StreamExecutionEnvironment.getExecutionEnvironment() + +// Specify watermarking strategy and the name of the DynamoDB Streams Source operator. +// Specify return type using TypeInformation. +// Specify also UID of operator in line with Flink best practice. Review Comment: Removed also -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
