[
https://issues.apache.org/jira/browse/BEAM-10706?focusedWorklogId=523224&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-523224
]
ASF GitHub Bot logged work on BEAM-10706:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 11/Dec/20 15:15
Start Date: 11/Dec/20 15:15
Worklog Time Spent: 10m
Work Description: iemejia commented on a change in pull request #12583:
URL: https://github.com/apache/beam/pull/12583#discussion_r534513398
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -447,19 +467,41 @@ public void setup() {
@StartBundle
public void startBundle(StartBundleContext context) {
- batch = new ArrayList<>();
+ batch = new HashMap<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
final KV<String, WriteRequest> writeRequest =
(KV<String, WriteRequest>)
spec.getWriteItemMapperFn().apply(context.element());
- batch.add(writeRequest);
+ batch.put(
+ KV.of(writeRequest.getKey(),
extractPkeyValues(writeRequest.getValue())), writeRequest);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}
+ private Map<String, AttributeValue> extractPkeyValues(WriteRequest
request) {
+ if (spec.getOverwriteByPKeys() != null) {
Review comment:
not needed if we do the upgrade above we will have a default empty
collection on construction time so maybe we can compare with empty collection.
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -358,7 +363,18 @@ public boolean test(Throwable throwable) {
abstract Builder<T> setWriteItemMapperFn(
SerializableFunction<T, KV<String, WriteRequest>> writeItemMapperFn);
- abstract Write<T> build();
+ abstract Builder<T> setOverwriteByPKeys(List<String> overwriteByPKeys);
+
+ abstract Write<T> autoBuild();
+
+ abstract Optional<List<String>> getOverwriteByPKeys();
Review comment:
Can you please remove any `Optional` use. We can set the default value
to an empty collection on the write method by default and get the same effect:
```java
public static <T> Write<T> write() {
return new AutoValue_DynamoDBIO_Write.Builder().setOverwriteByPKeys(new
ArrayList<>()).build();
}
```
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -447,19 +467,41 @@ public void setup() {
@StartBundle
public void startBundle(StartBundleContext context) {
- batch = new ArrayList<>();
+ batch = new HashMap<>();
}
@ProcessElement
public void processElement(ProcessContext context) throws Exception {
final KV<String, WriteRequest> writeRequest =
(KV<String, WriteRequest>)
spec.getWriteItemMapperFn().apply(context.element());
- batch.add(writeRequest);
+ batch.put(
+ KV.of(writeRequest.getKey(),
extractPkeyValues(writeRequest.getValue())), writeRequest);
if (batch.size() >= BATCH_SIZE) {
flushBatch();
}
}
+ private Map<String, AttributeValue> extractPkeyValues(WriteRequest
request) {
+ if (spec.getOverwriteByPKeys() != null) {
+ if (request.getPutRequest() != null) {
+ return request.getPutRequest().getItem().entrySet().stream()
+ .filter(entry ->
spec.getOverwriteByPKeys().contains(entry.getKey()))
Review comment:
This assumes that keys are NOT composed? I mean the existence of all the
attributes used in the key is not mandatory for deduplication? It seems that if
we have only one attribute in the key this will pass too or am I misreading it.
##########
File path:
sdks/java/io/amazon-web-services/src/test/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIOTest.java
##########
@@ -213,4 +219,64 @@ public void testRetries() throws Throwable {
}
fail("Pipeline is expected to fail because we were unable to write to
DynamoDB.");
}
+
+ /**
+ * A DoFn used to generate outputs duplicated N times, where N is the input.
Used to generate
+ * bundles with duplicate elements.
+ */
+ private static class WriteDuplicateGeneratorDoFn extends DoFn<Integer,
WriteRequest> {
+ @ProcessElement
+ public void processElement(ProcessContext ctx) {
+ for (int i = 0; i < ctx.element(); i++) {
+
DynamoDBIOTestHelper.generateWriteRequests(numOfItems).forEach(ctx::output);
+ }
+ }
+ }
+
+ @Test
+ public void testWriteDeduplication() {
+ // designate duplication factor for each bundle
+ final List<Integer> duplications = Arrays.asList(1, 2, 3);
+
+ final List<String> overwriteByPKeys =
+ Arrays.asList(DynamoDBIOTestHelper.ATTR_NAME_1,
DynamoDBIOTestHelper.ATTR_NAME_2);
+
+ AmazonDynamoDB amazonDynamoDBMock = Mockito.mock(AmazonDynamoDB.class);
+
+ pipeline
+ .apply(Create.of(duplications))
+ .apply("duplicate", ParDo.of(new WriteDuplicateGeneratorDoFn()))
+ .apply(
+ DynamoDBIO.<WriteRequest>write()
+ .withWriteRequestMapperFn(
+ (SerializableFunction<WriteRequest, KV<String,
WriteRequest>>)
+ writeRequest -> KV.of(tableName, writeRequest))
+ .withRetryConfiguration(
+ DynamoDBIO.RetryConfiguration.create(5,
Duration.standardMinutes(1)))
+
.withAwsClientsProvider(AwsClientsProviderMock.of(amazonDynamoDBMock))
+ .withOverwriteByPKeys(overwriteByPKeys));
+
+ pipeline.run().waitUntilFinish();
+
+ ArgumentCaptor<BatchWriteItemRequest> argumentCaptor =
Review comment:
:+1:
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -85,7 +87,8 @@
* t -> KV.of(tableName, writeRequest))
* .withRetryConfiguration(
* DynamoDBIO.RetryConfiguration.create(5,
Duration.standardMinutes(1)))
- * .withAwsClientsProvider(new BasicDynamoDbProvider(accessKey,
secretKey, region));
+ * .withAwsClientsProvider(new BasicDynamoDbProvider(accessKey,
secretKey, region))
+ * .withOverwriteByPKeys(overwriteByPKeys));
Review comment:
Can we remove this from the basic example and add an explanation apart
similar to this (or improved with the explanation you gave in the discussion).
`If your stream has duplicated primary keys you might find a ValidationError
because AWS does not allow to write the same keys as part of the same batch
operation, in this case you might need to explicitly set the keys that
correspond to your primary key to be deduplicated using the
`withOverwriteByPKeys` method`
##########
File path:
sdks/java/io/amazon-web-services/src/main/java/org/apache/beam/sdk/io/aws/dynamodb/DynamoDBIO.java
##########
@@ -85,7 +87,8 @@
* t -> KV.of(tableName, writeRequest))
* .withRetryConfiguration(
* DynamoDBIO.RetryConfiguration.create(5,
Duration.standardMinutes(1)))
- * .withAwsClientsProvider(new BasicDynamoDbProvider(accessKey,
secretKey, region));
+ * .withAwsClientsProvider(new BasicDynamoDbProvider(accessKey,
secretKey, region))
+ * .withOverwriteByPKeys(overwriteByPKeys));
Review comment:
Also maybe we can find a more clear name for this method/attribute
`withDeduplicateKeys`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 523224)
Time Spent: 4h 50m (was: 4h 40m)
> DynamoDBIO fail to write to the same key in short consecution
> -------------------------------------------------------------
>
> Key: BEAM-10706
> URL: https://issues.apache.org/jira/browse/BEAM-10706
> Project: Beam
> Issue Type: Bug
> Components: io-java-aws
> Affects Versions: 2.23.0
> Reporter: Dennis Yung
> Assignee: Dennis Yung
> Priority: P2
> Fix For: 2.27.0
>
> Time Spent: 4h 50m
> Remaining Estimate: 0h
>
> Internally, DynamoDBIO.Write uses the batchWriteItem method from the AWS SDK
> to sink items. However, there is a limitation in the AWS SDK that a call to
> batchWriteItem cannot contain duplicate keys.
> Currently DynamoDBIO.Write performs no key deduplication before flushing a
> batch, which could cause ValidationException: Provided list of item keys
> contains duplicates, if consecutive updates to a single key is within the
> batch size (currently hardcoded to be 25).
> To fix this bug, the batch of write requests need to be deduplicated before
> being sent to batchRequest.addRequestItemsEntry
--
This message was sent by Atlassian Jira
(v8.3.4#803005)