nicusX commented on code in PR #152:
URL: 
https://github.com/apache/flink-connector-aws/pull/152#discussion_r1759682631


##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java:
##########
@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                         .setDynamoDbClientProperties(
                                 
dynamoDbConfiguration.getSinkClientProperties());
 
+        if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {
+            builder =
+                    builder.setPrimaryKeys(
+                            new HashSet<>(

Review Comment:
   +1
   It must be an ordered set/list of 1 or 2 elements.
   
   Suggestion: if not too complicated, for a user perspective it would be 
clearer a no-nonsense passing a *PrimaryKey* object with two fields, 
*partitionKey* and *sortKey*, where *partitionKey*



##########
docs/content/docs/connectors/table/dynamodb.md:
##########
@@ -303,6 +303,46 @@ WITH (
 );
 ```
 
+## Specifying Primary Key For Deletes
+
+The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that 
a Dynamo Delete request contain **only** the key field(s), or else the Delete 
request will fail with `DynamoDbException: The provided key element does not 
match the schema`.
+Thus, if a Changelog stream being is being written to DynamoDB that contains 
DELETEs, you must specify the `PRIMARY KEY` on the table.
+This `PRIMARY KEY` specified for the Flink SQL Table must match the actual 
Primary Key of the DynamoDB table - so it must be either just the Partition 
Key, or in case of a composite primary key, it must be the Partition Key and 
Sort Key.
+
+Example For Partition Key as only Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) 
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Example For Partition Key and Sort Key as Composite Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  PRIMARY KEY (user_id, item_id) NOT ENFORCED
+) 
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be 
used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to 
dedeuplicate data and support DELETEs.

Review Comment:
   I would clarify that `PARTITIONED BY` is used by Flink for deduplication 
within the same batch (and to support delete), but it's different from 
DynamoDB's *partitionKey*. If the user is familiar with DynamoDB but not much 
with Flink this may generate lot of confusion.



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSinkFactory.java:
##########
@@ -58,6 +58,17 @@ public DynamicTableSink createDynamicTableSink(Context 
context) {
                         .setDynamoDbClientProperties(
                                 
dynamoDbConfiguration.getSinkClientProperties());
 
+        if (catalogTable.getResolvedSchema().getPrimaryKey().isPresent()) {

Review Comment:
   +1
   Also, I'd add a check that, if `PARTITION BY` and `PRIMARY KEY` are both 
specified, they must be identical



##########
docs/content/docs/connectors/table/dynamodb.md:
##########
@@ -303,6 +303,46 @@ WITH (
 );
 ```
 
+## Specifying Primary Key For Deletes
+
+The DynamoDB sink supports Delete requests to DynamoDB, but AWS requires that 
a Dynamo Delete request contain **only** the key field(s), or else the Delete 
request will fail with `DynamoDbException: The provided key element does not 
match the schema`.
+Thus, if a Changelog stream being is being written to DynamoDB that contains 
DELETEs, you must specify the `PRIMARY KEY` on the table.
+This `PRIMARY KEY` specified for the Flink SQL Table must match the actual 
Primary Key of the DynamoDB table - so it must be either just the Partition 
Key, or in case of a composite primary key, it must be the Partition Key and 
Sort Key.
+
+Example For Partition Key as only Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  PRIMARY KEY (user_id) NOT ENFORCED
+) 
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Example For Partition Key and Sort Key as Composite Primary Key:
+```sql
+CREATE TABLE DynamoDbTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  PRIMARY KEY (user_id, item_id) NOT ENFORCED
+) 
+WITH (
+  'connector' = 'dynamodb',
+  'table-name' = 'user_behavior',
+  'aws.region' = 'us-east-2'
+);
+```
+
+Note that this Primary Key functionality, specified by `PRIMARY KEY`, can be 
used alongside the Sink Partitioning mentioned above via `PARTITIONED BY` to 
dedeuplicate data and support DELETEs.

Review Comment:
   Also, `PARTITIONED BY` and `PRIMARY KEY` should always be the same.
   (see other comment by @dzikosc below)



##########
flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/DynamoDbDynamicSink.java:
##########
@@ -62,7 +63,8 @@ protected DynamoDbDynamicSink(
             boolean failOnError,
             Properties dynamoDbClientProperties,
             DataType physicalDataType,
-            Set<String> overwriteByPartitionKeys) {
+            Set<String> overwriteByPartitionKeys,
+            Set<String> primaryKeys) {

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to