[ https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hong Liang Teoh updated FLINK-35500: ------------------------------------ Fix Version/s: aws-connector-5.1.0 > DynamoDb Table API Sink fails to delete elements due to key not found > --------------------------------------------------------------------- > > Key: FLINK-35500 > URL: https://issues.apache.org/jira/browse/FLINK-35500 > Project: Flink > Issue Type: Bug > Components: Connectors / DynamoDB > Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0, > aws-connector-4.2.0 > Reporter: Ahmed Hamdy > Priority: Major > Labels: pull-request-available > Fix For: aws-connector-4.4.0, aws-connector-5.1.0 > > > h2. Description > When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} > records and throws > {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException: > The provided key element does not match the schema{quote} > This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key > instead of the constructed primary Key[1]. > Note: The issue is reported in user mailing list[2] > h2. Steps to Reproduce > (1) Create a new DynamoDB table in AWS. Command line: > aws dynamodb create-table \ > --table-name orders \ > --attribute-definitions AttributeName=userId,AttributeType=S \ > --key-schema AttributeName=userId,KeyType=HASH \ > --billing-mode PAY_PER_REQUEST > (2) Create an input file in Debezium-JSON format with the following rows to > start: > {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}} > {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}} > {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}} > {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}} > (3) Start the Flink SQL Client, and run the following, substituting in the > proper local paths for the Dynamo Connector JAR file and for this local > sample input file: > ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar'; > SET 'execution.runtime-mode' = 'streaming'; > SET 'sql-client.execution.result-mode' = 'changelog'; > CREATE TABLE Orders_CDC( > orderId BIGINT, > price float, > userId STRING > ) WITH ( > 'connector' = 'filesystem', > 'path' = '/path/to/input_file.jsonl', > 'format' = 'debezium-json' > ); > CREATE TABLE Orders_Dynamo ( > orderId BIGINT, > price float, > userId STRING, > PRIMARY KEY (userId) NOT ENFORCED > ) PARTITIONED BY ( userId ) > WITH ( > 'connector' = 'dynamodb', > 'table-name' = 'orders', > 'aws.region' = 'us-east-1' > ); > INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; > (4) At this point, we will see that things currently all work properly, and > these 4 rows are inserted properly to Dynamo, because they are "Insert" > operations. So far, so good! > (5) Now, add the following row to the input file. This represents a deletion > in Debezium format, which should then cause a Deletion on the corresponding > DynamoDB table: > {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}} > (6) Re-Run the SQL statement: > INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ; > h3. References > 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267 > 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf -- This message was sent by Atlassian Jira (v8.20.10#820010)