This is an automated email from the ASF dual-hosted git repository. vbalaji pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 74793d5d6bd [HUDI-7106] Fix sqs deletes, deltasync service close and error table default configs. (#10117) 74793d5d6bd is described below commit 74793d5d6bddce9f28db6c0dfce08539d92f1d19 Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com> AuthorDate: Tue Nov 21 09:53:12 2023 -0800 [HUDI-7106] Fix sqs deletes, deltasync service close and error table default configs. (#10117) Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local> --- .../hudi/utilities/sources/helpers/CloudObjectsSelector.java | 12 ++++++++---- .../org/apache/hudi/utilities/streamer/ErrorTableUtils.java | 2 +- .../java/org/apache/hudi/utilities/streamer/StreamSync.java | 5 ++--- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java index efe2913255f..8c447d93a0f 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java @@ -200,9 +200,12 @@ public class CloudObjectsSelector { * Delete batch of messages from queue. */ protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, List<Message> messagesToBeDeleted) { - DeleteMessageBatchRequest deleteBatchReq = - DeleteMessageBatchRequest.builder().queueUrl(queueUrl).build(); - List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(deleteBatchReq.entries()); + if (messagesToBeDeleted.isEmpty()) { + return; + } + DeleteMessageBatchRequest.Builder builder = DeleteMessageBatchRequest.builder().queueUrl(queueUrl); + List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>(); + for (Message message : messagesToBeDeleted) { deleteEntries.add( DeleteMessageBatchRequestEntry.builder() @@ -210,7 +213,8 @@ public class CloudObjectsSelector { .receiptHandle(message.receiptHandle()) .build()); } - DeleteMessageBatchResponse deleteResponse = sqs.deleteMessageBatch(deleteBatchReq); + builder.entries(deleteEntries); + DeleteMessageBatchResponse deleteResponse = sqs.deleteMessageBatch(builder.build()); List<String> deleteFailures = deleteResponse.failed().stream() .map(BatchResultErrorEntry::id) diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java index 694990cf1fa..8907a1b6647 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java @@ -64,7 +64,7 @@ public final class ErrorTableUtils { public static HoodieErrorTableConfig.ErrorWriteFailureStrategy getErrorWriteFailureStrategy( TypedProperties props) { - String writeFailureStrategy = props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key()); + String writeFailureStrategy = props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key(), ERROR_TABLE_WRITE_FAILURE_STRATEGY.defaultValue()); return HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy); } diff --git a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java index 657474525f1..7f6f254939d 100644 --- a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java +++ b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java @@ -119,12 +119,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Set; import java.util.function.Function; +import java.util.stream.Collectors; import scala.Tuple2; import scala.collection.JavaConversions; @@ -982,7 +981,7 @@ public class StreamSync implements Serializable, Closeable { } public void runMetaSync() { - Set<String> syncClientToolClasses = new HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(","))); + List<String> syncClientToolClasses = Arrays.stream(cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList()); // for backward compatibility if (cfg.enableHiveSync) { cfg.enableMetaSync = true;