[ 
https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851162#comment-17851162
 ] 

Aleksandr Pilipenko commented on FLINK-35500:
---------------------------------------------

{quote}
yes, I agree, however It seems that by design the DynamoDbWriter is responsible 
for overriding partition keys propagated from the Table API DynamicSinkFactory, 
I believe it would be confusing to split the responsibility for both the  
element converter and the writer.
{quote}
True, however overwriteByPartitionKeys is used for request deduplication and 
not strictly required to only have table keys.
Regardless, I believe that ElementConverter[1] should distinguish between PUT 
and DELETE requests while populating items field of DynamoDbWriteRequest 
object, currently only type is different between these 2 cases.
To do that, converter will need to have information about table keys in order 
to only include required fields.

[1] - 
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java#L48-L72

> DynamoDb Table API Sink fails to delete elements due to key not found
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35500
>                 URL: https://issues.apache.org/jira/browse/FLINK-35500
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB
>    Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0
>            Reporter: Ahmed Hamdy
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> h2. Description
> When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} 
> records and throws 
> {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
>  The provided key element does not match the schema{quote}
> This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key 
> instead of the constructed primary Key[1].
> Note: The issue is reported in user mailing list[2]
> h2. Steps to Reproduce
> (1) Create a new DynamoDB table in AWS.  Command line:
> aws dynamodb create-table \
>   --table-name orders \
>   --attribute-definitions AttributeName=userId,AttributeType=S \
>   --key-schema AttributeName=userId,KeyType=HASH \
>   --billing-mode PAY_PER_REQUEST
> (2) Create an input file in Debezium-JSON format with the following rows to 
> start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
> (3) Start the Flink SQL Client, and run the following, substituting in the 
> proper local paths for the Dynamo Connector JAR file and for this local 
> sample input file:
> ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
> CREATE TABLE Orders_CDC(
>   orderId BIGINT,
>   price float,
>   userId STRING
>  ) WITH (
>    'connector' = 'filesystem',
>    'path' = '/path/to/input_file.jsonl',
>    'format' = 'debezium-json'
>  );
> CREATE TABLE Orders_Dynamo (
>   orderId BIGINT,
>   price float,
>   userId STRING,
>   PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
>   'connector' = 'dynamodb',
>   'table-name' = 'orders',
>   'aws.region' = 'us-east-1'
> );
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> (4) At this point, we will see that things currently all work properly, and 
> these 4 rows are inserted properly to Dynamo, because they are "Insert" 
> operations.   So far, so good!
> (5) Now, add the following row to the input file.  This represents a deletion 
> in Debezium format, which should then cause a Deletion on the corresponding 
> DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> h3. References
> 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
> 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to