[
https://issues.apache.org/jira/browse/FLINK-35500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851194#comment-17851194
]
Rob Goretsky commented on FLINK-35500:
--------------------------------------
With regard to determining the proper Primary Key fields to pass in the
DynamoDB Delete request, I had initially considered that we could reuse the
existing "overwriteByPartitionKeys" value from the Table configuration. That
said, I could see how using that "overwriteByPartitionKeys", which has a very
specific documented meaning as "Used to deduplicate write requests within each
batch pushed to DynamoDB", would be confusing if we were using it for a
different purpose now in determining the PK for DynamoDB Delete requests.
[~a.pilipenko]'s proposal that we'd instead:
{quote}get information on table primary keys in DynamicSinkFactory and provide
it to element converter used by Table API sink implementation. Assumption here
is that partition and sort keys of DDB table would be mirrored as primary keys
on Flink SQL side.
{quote}
does make sense in that regard - We could document that by specifying PRIMARY
KEY(s) on the Flink SQL Table, they will be used when making DELETE requests to
DynamoDB. Although now we'd have two places you'd need to specify the
Primary Key fields - potentially in a PRIMARY KEY clause and also in the
PARTITIONED BY clause. Though they'd have different uses and you wouldn't
always need to specify both, depending on your use case. What are your
thoughts on that?
On how we could make DELETEs work properly once we do know the Primary Key
fields:
[~chalixar] - You mentioned:
{quote}I am not sure I follow, even if the element converter did set the
primary key tag in the element, we still use the complete item as a key in
delete requests from writer[2], is this behaviour normal?
{quote}
I think there are two possible places we could make the fix, and I list this
below as Option 1 -
*Option 1* - DynamoDBWriteRequest [1] is a public interface exposed by this
DynamoDB connector, and is the class that users must map their DataStream
records into when using a Custom Element Converter. DynamoDBWriteRequest
currently only has the concepts of an "Item" and a "Type" ,and no explicit Key
setters/getters. So we'd adjust the DynamoDBWriteRequest to have the concept
of the Key. And then In the DynamoDbSinkWriter, we could adjust how DELETE
requests are created, so that instead of doing:
{{DeleteRequest.builder().key(dynamoDbWriteRequest.getItem()).build())}}
It would instead do:
{{DeleteRequest.builder().key(dynamoDbWriteRequest.getKey()).build())}}
This would create a new responsibility for anyone implementing a Custom Element
Converter to be sure to set this Key field (using a new setKey method we'd add
to DynamoDBWriteRequest), which could also be a breaking change for anyone not
setting it and expecting DELETE operations to work properly based on the Item
itself.
*Option 2* - We could adjust the DELETE logic in RowDataElementConverter [3] to
only include the Primary Key fields in the DynamoDbWriteRequest when calling
setItem(), rather than including all fields with GetItem as is currently done
here [4]. The good thing with this approach is that we do not introduce any
breaking change or new requirement to call setKey() when constructing a
DynamoDBWriteRequest with a Custom Element Converter.
I think Option 2 seems cleaner and simpler, and keeps the surface of this
change limited to just the RowDataElementConverter, but am open to thoughts or
other options here too!
1 -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbWriteRequest.java]
2 -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267]
3 -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java#L63-L65]
4 -
[https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/table/RowDataElementConverter.java#L54-L56]
> DynamoDb Table API Sink fails to delete elements due to key not found
> ---------------------------------------------------------------------
>
> Key: FLINK-35500
> URL: https://issues.apache.org/jira/browse/FLINK-35500
> Project: Flink
> Issue Type: Bug
> Components: Connectors / DynamoDB
> Affects Versions: aws-connector-4.0.0, aws-connector-4.1.0,
> aws-connector-4.2.0
> Reporter: Ahmed Hamdy
> Priority: Major
> Fix For: aws-connector-4.4.0
>
>
> h2. Description
> When DynamoDbSink is used with CDC sources, it fails to process {{DELETE}}
> records and throws
> {quote}org.apache.flink.connector.dynamodb.shaded.software.amazon.awssdk.services.dynamodb.model.DynamoDbException:
> The provided key element does not match the schema{quote}
> This is due to {{DynamoDbSinkWriter}} passing the whole DynamoDb Item as key
> instead of the constructed primary Key[1].
> Note: The issue is reported in user mailing list[2]
> h2. Steps to Reproduce
> (1) Create a new DynamoDB table in AWS. Command line:
> aws dynamodb create-table \
> --table-name orders \
> --attribute-definitions AttributeName=userId,AttributeType=S \
> --key-schema AttributeName=userId,KeyType=HASH \
> --billing-mode PAY_PER_REQUEST
> (2) Create an input file in Debezium-JSON format with the following rows to
> start:
> {"op": "c", "after": {"orderId": 1, "userId": "a", "price": 5}}
> {"op": "c", "after": {"orderId": 2, "userId": "b", "price": 7}}
> {"op": "c", "after": {"orderId": 3, "userId": "c", "price": 9}}
> {"op": "c", "after": {"orderId": 4, "userId": "a", "price": 11}}
> (3) Start the Flink SQL Client, and run the following, substituting in the
> proper local paths for the Dynamo Connector JAR file and for this local
> sample input file:
> ADD JAR '/Users/robg/Downloads/flink-sql-connector-dynamodb-4.2.0-1.18.jar';
> SET 'execution.runtime-mode' = 'streaming';
> SET 'sql-client.execution.result-mode' = 'changelog';
> CREATE TABLE Orders_CDC(
> orderId BIGINT,
> price float,
> userId STRING
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = '/path/to/input_file.jsonl',
> 'format' = 'debezium-json'
> );
> CREATE TABLE Orders_Dynamo (
> orderId BIGINT,
> price float,
> userId STRING,
> PRIMARY KEY (userId) NOT ENFORCED
> ) PARTITIONED BY ( userId )
> WITH (
> 'connector' = 'dynamodb',
> 'table-name' = 'orders',
> 'aws.region' = 'us-east-1'
> );
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> (4) At this point, we will see that things currently all work properly, and
> these 4 rows are inserted properly to Dynamo, because they are "Insert"
> operations. So far, so good!
> (5) Now, add the following row to the input file. This represents a deletion
> in Debezium format, which should then cause a Deletion on the corresponding
> DynamoDB table:
> {"op": "d", "before": {"orderId": 3, "userId": "c", "price": 9}}
> (6) Re-Run the SQL statement:
> INSERT INTO Orders_Dynamo SELECT * FROM Orders_CDC ;
> h3. References
> 1-https://github.com/apache/flink-connector-aws/blob/main/flink-connector-aws/flink-connector-dynamodb/src/main/java/org/apache/flink/connector/dynamodb/sink/DynamoDbSinkWriter.java#L267
> 2- https://lists.apache.org/thread/ysvctpvn6n9kn0qlf5b24gxchfg64ylf
--
This message was sent by Atlassian Jira
(v8.20.10#820010)