[ 
https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851241#comment-17851241
 ] 

Rob Goretsky commented on FLINK-35500:
--------------------------------------

I'll also admit that I haven't written Java (professionally) in about 20 years 
(Yes, back when it was called J2SE!)   That said, most of this code seems 
pretty intuitive, I'd just probably need some pointers around the style 
guidelines and perhaps in test configuration.

> DynamoDb Table API Sink fails to delete elements due to key not found
> ---------------------------------------------------------------------
>
>                 Key: FLINK-35500
>                 URL: https://issues.apache.org/jira/browse/FLINK-35500
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / DynamoDB
>    Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0, 
> aws-connector-4.2.0
>            Reporter: Ahmed Hamdy
>            Priority: Major
>             Fix For: aws-connector-4.4.0
>
>
> h2. Description
> When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}} 
> records and throws 
> {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
>  The provided key element does not match the schema{quote}
> This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key 
> instead of the constructed primary Key[1].
> Note: The issue is reported in user mailing list[2]
> h2. Steps to Reproduce
> (1) Create a new DynamoDB table in AWS.  Command line:
> aws dynamodb create-table \
>   --table-name orders \
>   --attribute-definitions AttributeName=userId,AttributeType=S \
>   --key-schema AttributeName=userId,KeyType=HASH \
>   --billing-mode PAY_PER_REQUEST
> (2) Create an input file in Debezium-JSON format with the following rows to 
> start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
> (3) Start the Flink SQL Client, and run the following, substituting in the 
> proper local paths for the Dynamo Connector JAR file and for this local 
> sample input file:
> ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
> CREATE TABLE Orders_CDC(
>   orderId BIGINT,
>   price float,
>   userId STRING
>  ) WITH (
>    'connector' = 'filesystem',
>    'path' = '/path/to/input_file.jsonl',
>    'format' = 'debezium-json'
>  );
> CREATE TABLE Orders_Dynamo (
>   orderId BIGINT,
>   price float,
>   userId STRING,
>   PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
>   'connector' = 'dynamodb',
>   'table-name' = 'orders',
>   'aws.region' = 'us-east-1'
> );
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> (4) At this point, we will see that things currently all work properly, and 
> these 4 rows are inserted properly to Dynamo, because they are "Insert" 
> operations.   So far, so good!
> (5) Now, add the following row to the input file.  This represents a deletion 
> in Debezium format, which should then cause a Deletion on the corresponding 
> DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> h3. References
> 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
> 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to