This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 74793d5d6bd [HUDI-7106] Fix sqs deletes, deltasync service close and
error table default configs. (#10117)
74793d5d6bd is described below
commit 74793d5d6bddce9f28db6c0dfce08539d92f1d19
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Nov 21 09:53:12 2023 -0800
[HUDI-7106] Fix sqs deletes, deltasync service close and error table
default configs. (#10117)
Co-authored-by: rmahindra123 <[email protected]>
---
.../hudi/utilities/sources/helpers/CloudObjectsSelector.java | 12 ++++++++----
.../org/apache/hudi/utilities/streamer/ErrorTableUtils.java | 2 +-
.../java/org/apache/hudi/utilities/streamer/StreamSync.java | 5 ++---
3 files changed, 11 insertions(+), 8 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
index efe2913255f..8c447d93a0f 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/CloudObjectsSelector.java
@@ -200,9 +200,12 @@ public class CloudObjectsSelector {
* Delete batch of messages from queue.
*/
protected void deleteBatchOfMessages(SqsClient sqs, String queueUrl,
List<Message> messagesToBeDeleted) {
- DeleteMessageBatchRequest deleteBatchReq =
- DeleteMessageBatchRequest.builder().queueUrl(queueUrl).build();
- List<DeleteMessageBatchRequestEntry> deleteEntries = new
ArrayList<>(deleteBatchReq.entries());
+ if (messagesToBeDeleted.isEmpty()) {
+ return;
+ }
+ DeleteMessageBatchRequest.Builder builder =
DeleteMessageBatchRequest.builder().queueUrl(queueUrl);
+ List<DeleteMessageBatchRequestEntry> deleteEntries = new ArrayList<>();
+
for (Message message : messagesToBeDeleted) {
deleteEntries.add(
DeleteMessageBatchRequestEntry.builder()
@@ -210,7 +213,8 @@ public class CloudObjectsSelector {
.receiptHandle(message.receiptHandle())
.build());
}
- DeleteMessageBatchResponse deleteResponse =
sqs.deleteMessageBatch(deleteBatchReq);
+ builder.entries(deleteEntries);
+ DeleteMessageBatchResponse deleteResponse =
sqs.deleteMessageBatch(builder.build());
List<String> deleteFailures =
deleteResponse.failed().stream()
.map(BatchResultErrorEntry::id)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
index 694990cf1fa..8907a1b6647 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorTableUtils.java
@@ -64,7 +64,7 @@ public final class ErrorTableUtils {
public static HoodieErrorTableConfig.ErrorWriteFailureStrategy
getErrorWriteFailureStrategy(
TypedProperties props) {
- String writeFailureStrategy =
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key());
+ String writeFailureStrategy =
props.getString(ERROR_TABLE_WRITE_FAILURE_STRATEGY.key(),
ERROR_TABLE_WRITE_FAILURE_STRATEGY.defaultValue());
return
HoodieErrorTableConfig.ErrorWriteFailureStrategy.valueOf(writeFailureStrategy);
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 657474525f1..7f6f254939d 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -119,12 +119,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
-import java.util.Set;
import java.util.function.Function;
+import java.util.stream.Collectors;
import scala.Tuple2;
import scala.collection.JavaConversions;
@@ -982,7 +981,7 @@ public class StreamSync implements Serializable, Closeable {
}
public void runMetaSync() {
- Set<String> syncClientToolClasses = new
HashSet<>(Arrays.asList(cfg.syncClientToolClassNames.split(",")));
+ List<String> syncClientToolClasses =
Arrays.stream(cfg.syncClientToolClassNames.split(",")).distinct().collect(Collectors.toList());
// for backward compatibility
if (cfg.enableHiveSync) {
cfg.enableMetaSync = true;