This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 1562bb658f [HUDI-4031] Avoid clustering update handling when no
pending replacecommit (#5487)
1562bb658f is described below
commit 1562bb658f8f29f57763eaa6f9bd5a2ed7e80a7c
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed May 4 19:47:11 2022 +0530
[HUDI-4031] Avoid clustering update handling when no pending replacecommit
(#5487)
---
.../hudi/table/action/commit/BaseSparkCommitActionExecutor.java | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index ade5508977..205da82ac1 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -112,10 +112,8 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
}
}
- private HoodieData<HoodieRecord<T>>
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords) {
+ private HoodieData<HoodieRecord<T>>
clusteringHandleUpdate(HoodieData<HoodieRecord<T>> inputRecords,
Set<HoodieFileGroupId> fileGroupsInPendingClustering) {
context.setJobStatus(this.getClass().getSimpleName(), "Handling updates
which are under clustering");
- Set<HoodieFileGroupId> fileGroupsInPendingClustering =
-
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
UpdateStrategy<T, HoodieData<HoodieRecord<T>>> updateStrategy =
(UpdateStrategy<T, HoodieData<HoodieRecord<T>>>) ReflectionUtils
.loadClass(config.getClusteringUpdatesStrategyClass(), this.context,
fileGroupsInPendingClustering);
Pair<HoodieData<HoodieRecord<T>>, Set<HoodieFileGroupId>>
recordsAndPendingClusteringFileGroups =
@@ -166,7 +164,9 @@ public abstract class BaseSparkCommitActionExecutor<T
extends HoodieRecordPayloa
}
// handle records update with clustering
- HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate =
clusteringHandleUpdate(inputRecords);
+ Set<HoodieFileGroupId> fileGroupsInPendingClustering =
+
table.getFileSystemView().getFileGroupsInPendingClustering().map(Pair::getKey).collect(Collectors.toSet());
+ HoodieData<HoodieRecord<T>> inputRecordsWithClusteringUpdate =
fileGroupsInPendingClustering.isEmpty() ? inputRecords :
clusteringHandleUpdate(inputRecords, fileGroupsInPendingClustering);
context.setJobStatus(this.getClass().getSimpleName(), "Doing partition and
writing data");
HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);