[
https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851131#comment-17851131
]
Aleksandr Pilipenko edited comment on FLINK-35500 at 5/31/24 3:21 PM:
----------------------------------------------------------------------
Looking at this issue, it appears that issue is in TableAPI element converter
implementation.
RowDataElementConverter[1] does not have information about DDB table partition
and sort keys or primary keys of table on Flink side.
This is less of an issue on DataStream API part, since default element
converter only support insert (PUT) operations[2] and user can use correct
mapping in custom element converter implementation.
One option to resolve this - to get information on table primary keys in
DynamicSinkFactory and provide it to element converter used by Table API sink
implementation. Assumption here is that partition and sort keys of DDB table
would be mirrored as primary keys on Flink SQL side.
[1] -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java]
[2] -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java#L58-L64]
was (Author: a.pilipenko):
Looking at this issue, it appears that issue is in TableAPI element converter
implementation.
RowDataElementConverter[1] does not have information about DDB table partition
and sort keys or primary keys of table on Flink side.
This is less of an issue on DataStream API part, since default element
converter only support insert (PUT) operations[2] and user can use correct
logic in custom element converter implementation.
One option to resolve this - to get information on table primary keys in
DynamicSinkFactory and provide it to element converter used by Table API sink
implementation. Assumption here is that partition and sort keys of DDB table
would be mirrored as primary keys on Flink SQL side.
[1] -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java]
[2] -
https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbBeanElementConverter.java#L58-L64
> DynamoDb Table API Sink fails to delete elements due to key not found
> ---------------------------------------------------------------------
>
> Key: FLINK-35500
> URL: https://issues.apache.org/jira/browse/FLINK-35500
> Project: Flink
> Issue Type: Bug
> Components: Connectors / DynamoDB
> Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0,
> aws-connector-4.2.0
> Reporter: Ahmed Hamdy
> Priority: Major
> Fix For: aws-connector-4.4.0
>
>
> h2. Description
> When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}}
> records and throws
> {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
> The provided key element does not match the schema{quote}
> This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key
> instead of the constructed primary Key[1].
> Note: The issue is reported in user mailing list[2]
> h2. Steps to Reproduce
> (1) Create a new DynamoDB table in AWS. Command line:
> aws dynamodb create-table \
> --table-name orders \
> --attribute-definitions AttributeName=userId,AttributeType=S \
> --key-schema AttributeName=userId,KeyType=HASH \
> --billing-mode PAY_PER_REQUEST
> (2) Create an input file in Debezium-JSON format with the following rows to
> start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
> (3) Start the Flink SQL Client, and run the following, substituting in the
> proper local paths for the Dynamo Connector JAR file and for this local
> sample input file:
> ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
> CREATE TABLE Orders_CDC(
> orderId BIGINT,
> price float,
> userId STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = '/path/to/input_file.jsonl',
> 'format' = 'debezium-json'
> );
> CREATE TABLE Orders_Dynamo (
> orderId BIGINT,
> price float,
> userId STRING,
> PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
> 'connector' = 'dynamodb',
> 'table-name' = 'orders',
> 'aws.region' = 'us-east-1'
> );
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> (4) At this point, we will see that things currently all work properly, and
> these 4 rows are inserted properly to Dynamo, because they are "Insert"
> operations. So far, so good!
> (5) Now, add the following row to the input file. This represents a deletion
> in Debezium format, which should then cause a Deletion on the corresponding
> DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> h3. References
> 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
> 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf
--
This message was sent by Atlassian Jira
(v8.20.10#820010)