This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c4680e4856d [HUDI-8383] Cached `OptionsResolver::isInsertOverwrite` in
`BucketStreamWriteFunction::processElement` (#12113)
c4680e4856d is described below
commit c4680e4856d5f6ab724b182b903182c4f00abda1
Author: Geser Dugarov <[email protected]>
AuthorDate: Thu Oct 17 12:19:49 2024 +0700
[HUDI-8383] Cached `OptionsResolver::isInsertOverwrite` in
`BucketStreamWriteFunction::processElement` (#12113)
---
.../hudi/sink/bucket/BucketStreamWriteFunction.java | 17 ++++++++++++-----
1 file changed, 12 insertions(+), 5 deletions(-)
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
index 1475cf043b3..6c31f37cc91 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bucket/BucketStreamWriteFunction.java
@@ -80,6 +80,12 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
*/
private Functions.Function2<String, Integer, Integer> partitionIndexFunc;
+
+ /**
+ * To prevent strings compare for each record, define this only during open()
+ */
+ private boolean isInsertOverwrite;
+
/**
* Constructs a BucketStreamWriteFunction.
*
@@ -100,6 +106,7 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
this.bucketIndex = new HashMap<>();
this.incBucketIndex = new HashSet<>();
this.partitionIndexFunc = BucketIndexUtil.getPartitionIndexFunc(bucketNum,
parallelism);
+ this.isInsertOverwrite = OptionsResolver.isInsertOverwrite(config);
}
@Override
@@ -120,7 +127,11 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
final String partition = hoodieKey.getPartitionPath();
final HoodieRecordLocation location;
- bootstrapIndexIfNeed(partition);
+ // for insert overwrite operation skip the index loading
+ if (!isInsertOverwrite) {
+ bootstrapIndexIfNeed(partition);
+ }
+
Map<Integer, String> bucketToFileId =
bucketIndex.computeIfAbsent(partition, p -> new HashMap<>());
final int bucketNum = BucketIdentifier.getBucketId(hoodieKey,
indexKeyFields, this.bucketNum);
final String bucketId = partition + "/" + bucketNum;
@@ -154,10 +165,6 @@ public class BucketStreamWriteFunction<I> extends
StreamWriteFunction<I> {
* This is a required operation for each restart to avoid having duplicate
file ids for one bucket.
*/
private void bootstrapIndexIfNeed(String partition) {
- if (OptionsResolver.isInsertOverwrite(config)) {
- // skips the index loading for insert overwrite operation.
- return;
- }
if (bucketIndex.containsKey(partition)) {
return;
}