This is an automated email from the ASF dual-hosted git repository.
vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 130498708bb add job context (#10848)
130498708bb is described below
commit 130498708bb1cd5da1d0e725971b3d721eeef231
Author: Tim Brown <[email protected]>
AuthorDate: Mon Mar 11 20:42:02 2024 -0500
add job context (#10848)
---
.../org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java | 1 +
.../src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java | 3 ++-
2 files changed, 3 insertions(+), 1 deletion(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index 15f7b8a6b2c..36f75b6a5b0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -263,6 +263,7 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
Iterator<HoodieRecord<T>> recordItr) throws IOException;
protected HoodieWriteMetadata<HoodieData<WriteStatus>>
executeClustering(HoodieClusteringPlan clusteringPlan) {
+ context.setJobStatus(this.getClass().getSimpleName(), "Clustering records
for " + config.getTableName());
HoodieInstant instant =
HoodieTimeline.getReplaceCommitRequestedInstant(instantTime);
// Mark instant as clustering inflight
table.getActiveTimeline().transitionReplaceRequestedToInflight(instant,
Option.empty());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index ebb7729c1ee..2d8e0f02c31 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -513,6 +513,7 @@ public class StreamSync implements Serializable, Closeable {
private InputBatch fetchFromSourceAndPrepareRecords(Option<String>
resumeCheckpointStr, String instantTime,
HoodieTableMetaClient metaClient) {
+ hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Fetching
next batch: " + cfg.targetTableName);
HoodieRecordType recordType = createRecordMerger(props).getRecordType();
if (recordType == HoodieRecordType.SPARK &&
HoodieTableType.valueOf(cfg.tableType) == HoodieTableType.MERGE_ON_READ
&& !cfg.operation.equals(WriteOperationType.BULK_INSERT)
@@ -535,7 +536,7 @@ public class StreamSync implements Serializable, Closeable {
}
// handle empty batch with change in checkpoint
- hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty");
+ hoodieSparkContext.setJobStatus(this.getClass().getSimpleName(), "Checking
if input is empty: " + cfg.targetTableName);
if (useRowWriter) { // no additional processing required for row writer.