aromanenko-dev commented on a change in pull request #15964:
URL: https://github.com/apache/beam/pull/15964#discussion_r768827317
##########
File path:
sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/dynamodb/DynamoDBIO.java
##########
@@ -542,54 +562,64 @@ private void flushBatch() throws IOException,
InterruptedException {
if (batch.isEmpty()) {
return;
}
-
try {
- // Since each element is a KV<tableName, writeRequest> in the batch,
we need to group them
- // by tableName
- Map<String, List<WriteRequest>> mapTableRequest =
+ // Group values KV<tableName, writeRequest> by tableName
+ // Note: The original order of arrival is lost reading the map
entries.
+ Map<String, List<WriteRequest>> writesPerTable =
batch.values().stream()
- .collect(
- Collectors.groupingBy(
- KV::getKey, Collectors.mapping(KV::getValue,
Collectors.toList())));
-
- BatchWriteItemRequest batchRequest =
-
BatchWriteItemRequest.builder().requestItems(mapTableRequest).build();
-
- Sleeper sleeper = Sleeper.DEFAULT;
- BackOff backoff = retryBackoff.backoff();
- int attempt = 0;
- while (true) {
- attempt++;
- try {
- client.batchWriteItem(batchRequest);
- break;
- } catch (Exception ex) {
- // Fail right away if there is no retry configuration
- if (spec.getRetryConfiguration() == null
- ||
!spec.getRetryConfiguration().getRetryPredicate().test(ex)) {
- DYNAMO_DB_WRITE_FAILURES.inc();
- LOG.info(
- "Unable to write batch items {}.",
batchRequest.requestItems().entrySet(), ex);
- throw new IOException("Error writing to DynamoDB (no attempt
made to retry)", ex);
- }
-
- if (!BackOffUtils.next(sleeper, backoff)) {
- throw new IOException(
- String.format(
- "Error writing to DynamoDB after %d attempt(s). No
more attempts allowed",
- attempt),
- ex);
- } else {
- // Note: this used in test cases to verify behavior
- LOG.warn(String.format(RETRY_ATTEMPT_LOG, attempt), ex);
- }
- }
+ .collect(groupingBy(KV::getKey, mapping(KV::getValue,
toList())));
+
+ // Backoff used to resume from partial failures
+ BackOff resume = resumeBackoff.backoff();
+ do {
+ BatchWriteItemRequest batchRequest =
+
BatchWriteItemRequest.builder().requestItems(writesPerTable).build();
+ // If unprocessed items remain, we have to resume the operation
(with backoff)
+ writesPerTable = writeWithRetries(batchRequest).unprocessedItems();
+ } while (!writesPerTable.isEmpty() &&
BackOffUtils.next(Sleeper.DEFAULT, resume));
+
+ if (!writesPerTable.isEmpty()) {
+ DYNAMO_DB_WRITE_FAILURES.inc();
+ LOG.warn(RESUME_ERROR_LOG, writesPerTable);
Review comment:
Should it be `error` log level?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]