nicusX commented on code in PR #152:
URL:
https://github.com/apache/flink-connector-aws/pull/152#discussion_r1759682631
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java:
##########
@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
.setDynamoDbClientProperties(
dynamoDbConfiguration.getSinkClientProperties());
+ if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
+ builder =
+ builder.setPrimaryKeys(
+ new HashSet<>(
Review Comment:
+1
It must be an ordered set/list of 1 or 2 elements.
Suggestion: if not too complicated, for a user perspective it would be
clearer a no-nonsense passing a *PrimaryKey* object with two fields,
*partitionKey* and *sortKey*, where *partitionKey*
##########
docs/content/docs/connectors/table/dynamodb.md:
##########
@@ -303,6 +303,46 @@ WITH (
);
```
+## Specifying Primary Key For Deletes
+
+The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that
a Dynamo Delete request contain **only** the key field(s), or else the Delete
request will fail with `DynamoDbException: The provided key element does not
match the schema`.
+Thus, if a Changelog stream being is being written to DynamoDB that contains
DELETEs, you must specify the `PRIMARY KEY` on the table.
+This `PRIMARY KEY` specified for the Flink SQL Table must match the actual
Primary Key of the DynamoDB table - so it must be either just the Partition
Key, or in case of a composite primary key, it must be the Partition Key and
Sort Key.
+
+Example For Partition Key as only Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `category_id` BIGINT,
+ `behavior` STRING,
+ PRIMARY KEY (user_id) NOT ENFORCED
+)
+WITH (
+ 'connector' = 'dynamodb',
+ 'table-name' = 'user_behavior',
+ 'aws.region' = 'us-east-2'
+);
+```
+
+Example For Partition Key and Sort Key as Composite Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `category_id` BIGINT,
+ `behavior` STRING,
+ PRIMARY KEY (user_id, item_id) NOT ENFORCED
+)
+WITH (
+ 'connector' = 'dynamodb',
+ 'table-name' = 'user_behavior',
+ 'aws.region' = 'us-east-2'
+);
+```
+
+Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be
used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to
dedeuplicate data and support DELETEs.
Review Comment:
I would clarify that `PARTITIONED BY` is used by Flink for deduplication
within the same batch (and to support delete), but it's different from
DynamoDB's *partitionKey*. If the user is familiar with DynamoDB but not much
with Flink this may generate lot of confusion.
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java:
##########
@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context
context) {
.setDynamoDbClientProperties(
dynamoDbConfiguration.getSinkClientProperties());
+ if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
Review Comment:
+1
Also, I'd add a check that, if `PARTITION BY` and `PRIMARY KEY` are both
specified, they must be identical
##########
docs/content/docs/connectors/table/dynamodb.md:
##########
@@ -303,6 +303,46 @@ WITH (
);
```
+## Specifying Primary Key For Deletes
+
+The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that
a Dynamo Delete request contain **only** the key field(s), or else the Delete
request will fail with `DynamoDbException: The provided key element does not
match the schema`.
+Thus, if a Changelog stream being is being written to DynamoDB that contains
DELETEs, you must specify the `PRIMARY KEY` on the table.
+This `PRIMARY KEY` specified for the Flink SQL Table must match the actual
Primary Key of the DynamoDB table - so it must be either just the Partition
Key, or in case of a composite primary key, it must be the Partition Key and
Sort Key.
+
+Example For Partition Key as only Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `category_id` BIGINT,
+ `behavior` STRING,
+ PRIMARY KEY (user_id) NOT ENFORCED
+)
+WITH (
+ 'connector' = 'dynamodb',
+ 'table-name' = 'user_behavior',
+ 'aws.region' = 'us-east-2'
+);
+```
+
+Example For Partition Key and Sort Key as Composite Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+ `user_id` BIGINT,
+ `item_id` BIGINT,
+ `category_id` BIGINT,
+ `behavior` STRING,
+ PRIMARY KEY (user_id, item_id) NOT ENFORCED
+)
+WITH (
+ 'connector' = 'dynamodb',
+ 'table-name' = 'user_behavior',
+ 'aws.region' = 'us-east-2'
+);
+```
+
+Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be
used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to
dedeuplicate data and support DELETEs.
Review Comment:
Also, `PARTITIONED BY` and `PRIMARY KEY` should always be the same.
(see other comment by @dzikosc below)
##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java:
##########
@@ -62,7 +63,8 @@ protected DynamoDbDynamicSink(
boolean failOnError,
Properties dynamoDbClientProperties,
DataType physicalDataType,
- Set<String> overwriteByPartitionKeys) {
+ Set<String> overwriteByPartitionKeys,
+ Set<String> primaryKeys) {
Review Comment:
+1
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]