This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 74793d5d6bd [HUDI-7106] Fix sqs deletes, deltasync service close and 
error table default configs. (#10117)
74793d5d6bd is described below

commit 74793d5d6bddce9f28db6c0dfce08539d92f1d19
Author: Rajesh Mahindra <76502047+rmahindra...@users.noreply.github.com>
AuthorDate: Tue Nov 21 09:53:12 2023 -0800

    [HUDI-7106] Fix sqs deletes, deltasync service close and error table 
default configs. (#10117)
    
    Co-authored-by: rmahindra123 <rmahindra@Rajeshs-MacBook-Pro.local>
---
 .../hudi/utilities/sources/helpers/CloudObjectsSelector.java | 12 ++++++++----
 .../org/apache/hudi/utilities/streamer/ErrorTableUtils.java  |  2 +-
 .../java/org/apache/hudi/utilities/streamer/StreamSync.java  |  5 ++---
 3 files changed, 11 insertions(+), 8 deletions(-)

diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
index efe2913255f..8c447d93a0f 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
@@ -200,9 +200,12 @@ public class CloudObjectsSelector {
    * Delete batch of messages from queue.
    */
   protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl, 
List<Message> messagesToBeDeleted) {
-    DeleteMessageBatchRequest deleteBatchReq =
-        DeleteMessageBatchRequest.builder().queueUrl(queueUrl).build();
-    List<DeleteMessageBatchRequestEntry> deleteEntries = new 
ArrayList<>(deleteBatchReq.entries());
+    if (messagesToBeDeleted.isEmpty()) {
+      return;
+    }
+    DeleteMessageBatchRequest.Builder builder = 
DeleteMessageBatchRequest.builder().queueUrl(queueUrl);
+    List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
+
     for (Message message : messagesToBeDeleted) {
       deleteEntries.add(
           DeleteMessageBatchRequestEntry.builder()
@@ -210,7 +213,8 @@ public class CloudObjectsSelector {
                   .receiptHandle(message.receiptHandle())
                   .build());
     }
-    DeleteMessageBatchResponse deleteResponse = 
sqs.deleteMessageBatch(deleteBatchReq);
+    builder.entries(deleteEntries);
+    DeleteMessageBatchResponse deleteResponse = 
sqs.deleteMessageBatch(builder.build());
     List<String> deleteFailures =
         deleteResponse.failed().stream()
             .map(BatchResultErrorEntry::id)
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
index 694990cf1fa..8907a1b6647 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
@@ -64,7 +64,7 @@ public final class ErrorTableUtils {
 
   public static HoodieErrorTableConfig.ErrorWriteFailureStrategy 
getErrorWriteFailureStrategy(
       TypedProperties props) {
-    String writeFailureStrategy = 
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
+    String writeFailureStrategy = 
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key(), 
ERROR_TABLE_WRITE_FAILURE_STRATEGY.defaultValue());
     return 
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
   }
 
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 657474525f1..7f6f254939d 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -119,12 +119,11 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 import scala.Tuple2;
 import scala.collection.JavaConversions;
@@ -982,7 +981,7 @@ public class StreamSync implements Serializable, Closeable {
   }
 
   public void runMetaSync() {
-    Set<String> syncClientToolClasses = new 
HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
+    List<String> syncClientToolClasses = 
Arrays.stream(cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList());
     // for backward compatibility
     if (cfg.enableHiveSync) {
       cfg.enableMetaSync = true;

Reply via email to