[ 
https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851194#comment-17851194
 ] 

Rob Goretsky commented on FLINK-35500:
--------------------------------------

With regard to determining the proper Primary Key fields to pass in the 
DynamoDB Delete request, I had initially considered that we could reuse the 
existing "overwriteByPartitionKeys" value from the Table configuration.  That 
said, I could see how using that "overwriteByPartitionKeys", which has a very 
specific documented meaning as "Used to deduplicate write requests within each 
batch pushed to DynamoDB", would be confusing if we were using it for a 
different purpose now in determining the PK for DynamoDB Delete requests.

[~a.pilipenko]'s proposal that we'd instead:
{quote}get information on table primary keys in DynamicSinkFactory and provide 
it to element converter used by Table API sink implementation. Assumption here 
is that partition and sort keys of DDB table would be mirrored as primary keys 
on Flink SQL side.
{quote}
does make sense in that regard - We could document that by specifying PRIMARY 
KEY(s) on the Flink SQL Table, they will be used when making DELETE requests to 
DynamoDB.    Although now we'd have two places you'd need to specify the 
Primary Key fields - potentially in a PRIMARY KEY clause and also in the 
PARTITIONED BY clause.    Though they'd have different uses and you wouldn't 
always need to specify both, depending on your use case.   What are your 
thoughts on that?

 

On how we could make DELETEs work properly once we do know the Primary Key 
fields:

[~chalixar] - You mentioned:
{quote}I am not sure I follow, even if the element converter did set the 
primary key tag in the element, we still use the complete item as a key in 
delete requests from writer[2], is this behaviour normal?
{quote}
I think there are two possible places we could make the fix, and I list this 
below as Option 1 -

*Option 1* - DynamoDBWriteRequest [1] is a public interface exposed by this 
DynamoDB connector, and is the class that users must map their DataStream 
records into when using a Custom Element Converter.  DynamoDBWriteRequest 
currently only has the concepts of an "Item" and a "Type" ,and no explicit Key 
setters/getters.   So we'd adjust the DynamoDBWriteRequest to have the concept 
of the Key.   And then In the DynamoDbSinkWriter, we could adjust how DELETE 
requests are created, so that instead of doing:

{{DeleteRequest.builder().key(dynamoDbWriteRequest.getItem()).build())}}

It would instead do:

{{DeleteRequest.builder().key(dynamoDbWriteRequest.getKey()).build())}}

This would create a new responsibility for anyone implementing a Custom Element 
Converter to be sure to set this Key field (using a new setKey method we'd add 
to DynamoDBWriteRequest), which could also be a breaking change for anyone not 
setting it and expecting DELETE operations to work properly based on the Item 
itself.

 

*Option 2* - We could adjust the DELETE logic in RowDataElementConverter [3] to 
only include the Primary Key fields in the DynamoDbWriteRequest when calling 
setItem(), rather than including all fields with GetItem as is currently done 
here [4].   The good thing with this approach is that we do not introduce any 
breaking change or new requirement to call setKey() when constructing a 
DynamoDBWriteRequest with a Custom Element Converter.

I think Option 2 seems cleaner and simpler, and keeps the surface of this 
change limited to just the RowDataElementConverter, but am open to thoughts or 
other options here too!

 

1 - 
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java]

2 - 
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267]

3 - 
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java#L63-L65]

4 - 
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java#L54-L56]

 

 

> DynamoDb Table API Sink fails to delete elements due to key not found
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35500
>                 URL: https://issues.apache.org/jira/browse/FLINK-35500
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB
>    Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0
>            Reporter: Ahmed Hamdy
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> h2. Description
> When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} 
> records and throws 
> {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
>  The provided key element does not match the schema{quote}
> This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key 
> instead of the constructed primary Key[1].
> Note: The issue is reported in user mailing list[2]
> h2. Steps to Reproduce
> (1) Create a new DynamoDB table in AWS.  Command line:
> aws dynamodb create-table \
>   --table-name orders \
>   --attribute-definitions AttributeName=userId,AttributeType=S \
>   --key-schema AttributeName=userId,KeyType=HASH \
>   --billing-mode PAY_PER_REQUEST
> (2) Create an input file in Debezium-JSON format with the following rows to 
> start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
> (3) Start the Flink SQL Client, and run the following, substituting in the 
> proper local paths for the Dynamo Connector JAR file and for this local 
> sample input file:
> ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
> CREATE TABLE Orders_CDC(
>   orderId BIGINT,
>   price float,
>   userId STRING
>  ) WITH (
>    'connector' = 'filesystem',
>    'path' = '/path/to/input_file.jsonl',
>    'format' = 'debezium-json'
>  );
> CREATE TABLE Orders_Dynamo (
>   orderId BIGINT,
>   price float,
>   userId STRING,
>   PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
>   'connector' = 'dynamodb',
>   'table-name' = 'orders',
>   'aws.region' = 'us-east-1'
> );
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> (4) At this point, we will see that things currently all work properly, and 
> these 4 rows are inserted properly to Dynamo, because they are "Insert" 
> operations.   So far, so good!
> (5) Now, add the following row to the input file.  This represents a deletion 
> in Debezium format, which should then cause a Deletion on the corresponding 
> DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> h3. References
> 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
> 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to