[
https://issues.apache.org/jira/browse/BEAM-10706?focusedWorklogId=523232&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-523232
]
ASF GitHub Bot logged work on BEAM-10706:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Dec/20 15:48
Start Date: 11/Dec/20 15:48
Worklog Time Spent: 10m
Work Description: dennisylyung commented on a change in pull request
#12583:
URL: https://github.com/apache/beam/pull/12583#discussion_r541042726
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -447,19 +467,41 @@ public void setup() {
@StartBundle
public void startBundle(StartBundleContext context) {
- batch = new ArrayList<>();
+ batch = new HashMap<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
final KV<String, WriteRequest> writeRequest =
(KV<String, WriteRequest>)
spec.getWriteItemMapperFn().apply(context.element());
- batch.add(writeRequest);
+ batch.put(
+ KV.of(writeRequest.getKey(),
extractPkeyValues(writeRequest.getValue())), writeRequest);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}
+ private Map<String, AttributeValue> extractPkeyValues(WriteRequest
request) {
+ if (spec.getOverwriteByPKeys() != null) {
+ if (request.getPutRequest() != null) {
+ return request.getPutRequest().getItem().entrySet().stream()
+ .filter(entry ->
spec.getOverwriteByPKeys().contains(entry.getKey()))
Review comment:
This should handle composite keys.
Supposedly, this gets the item map
```
request.getPutRequest().getItem() // Map<String, AttributeValue>
```
and then retains only the relevant keys
```
... .entrySet().stream()
.filter(...)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue))
```
The filter function checks whether the key is one of the specified
deduplication keys
```
entry -> spec.getOverwriteByPKeys().contains(entry.getKey())
```
Is this correct?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 523232)
Time Spent: 5.5h (was: 5h 20m)
> DynamoDBIO fail to write to the same key in short consecution
> -------------------------------------------------------------
>
> Key: BEAM-10706
> URL: https://issues.apache.org/jira/browse/BEAM-10706
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Affects Versions: 2.23.0
> Reporter: Dennis Yung
> Assignee: Dennis Yung
> Priority: P2
> Fix For: 2.27.0
>
> Time Spent: 5.5h
> Remaining Estimate: 0h
>
> Internally, DynamoDBIO.Write uses the batchWriteItem method from the AWS SDK
> to sink items. However, there is a limitation in the AWS SDK that a call to
> batchWriteItem cannot contain duplicate keys.
> Currently DynamoDBIO.Write performs no key deduplication before flushing a
> batch, which could cause ValidationException: Provided list of item keys
> contains duplicates, if consecutive updates to a single key is within the
> batch size (currently hardcoded to be 25).
> To fix this bug, the batch of write requests need to be deduplicated before
> being sent to batchRequest.addRequestItemsEntry
--
This message was sent by Atlassian Jira
(v8.3.4#803005)