This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 3525983c90e [HUDI-7624] Fixing source read and index tagging duration
(#12789)
3525983c90e is described below
commit 3525983c90eb2fd02682596ada304ba11897290e
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Tue Feb 18 21:49:28 2025 -0800
[HUDI-7624] Fixing source read and index tagging duration (#12789)
---------
Co-authored-by: Rajesh Mahindra
<[email protected]>
---
.../action/commit/BaseCommitActionExecutor.java | 5 +++++
.../hudi/table/action/commit/BaseWriteHelper.java | 5 ++++-
.../table/action/commit/HoodieDeleteHelper.java | 5 ++++-
.../org/apache/hudi/metrics/TestHoodieMetrics.java | 9 +++++++++
.../commit/BaseFlinkCommitActionExecutor.java | 6 ++++++
.../commit/BaseJavaCommitActionExecutor.java | 7 ++++++-
.../commit/BaseSparkCommitActionExecutor.java | 22 +++++++++++++++-------
7 files changed, 49 insertions(+), 10 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
index eff9dd8bf65..255dfab4460 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java
@@ -42,6 +42,7 @@ import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
@@ -115,6 +116,10 @@ public abstract class BaseCommitActionExecutor<T, I, K, O,
R>
public abstract HoodieWriteMetadata<O> execute(I inputRecords);
+ public HoodieWriteMetadata<O> execute(I inputRecords, Option<HoodieTimer>
sourceReadAndIndexTimer) {
+ return this.execute(inputRecords);
+ }
+
/**
* Save the workload profile in an intermediate file (here re-using commit
files) This is useful when performing
* rollback for MOR tables. Only updates are recorded in the workload
profile metadata since updates to log blocks
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
index ff47b636098..5d6bb6048e5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseWriteHelper.java
@@ -24,6 +24,8 @@ import
org.apache.hudi.common.function.SerializableFunctionUnchecked;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.util.HoodieRecordUtils;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.exception.HoodieUpsertException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.table.HoodieTable;
@@ -44,6 +46,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I
BaseCommitActionExecutor<T, I, K, O, R>
executor,
WriteOperationType operationType) {
try {
+ HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start();
// De-dupe/merge if needed
I dedupedRecords =
combineOnCondition(shouldCombine, inputRecords,
configuredShuffleParallelism, table);
@@ -55,7 +58,7 @@ public abstract class BaseWriteHelper<T, I, K, O, R> extends
ParallelismHelper<I
taggedRecords = tag(dedupedRecords, context, table);
}
- HoodieWriteMetadata<O> result = executor.execute(taggedRecords);
+ HoodieWriteMetadata<O> result = executor.execute(taggedRecords,
Option.of(sourceReadAndIndexTimer));
return result;
} catch (Throwable e) {
if (e instanceof HoodieUpsertException) {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
index 17dd4282e14..413e77f87da 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/HoodieDeleteHelper.java
@@ -28,6 +28,8 @@ import org.apache.hudi.common.model.HoodieEmptyRecord;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieUpsertException;
@@ -79,6 +81,7 @@ public class HoodieDeleteHelper<T, R> extends
HoodieTable<T,
HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, HoodieData<WriteStatus>>
table,
BaseCommitActionExecutor<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>,
HoodieData<WriteStatus>, R> deleteExecutor) {
try {
+ HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start();
int targetParallelism =
deduceShuffleParallelism((HoodieData) keys,
config.getDeleteShuffleParallelism());
@@ -100,7 +103,7 @@ public class HoodieDeleteHelper<T, R> extends
HoodieData<HoodieRecord<T>> taggedValidRecords =
taggedRecords.filter(HoodieRecord::isCurrentLocationKnown);
HoodieWriteMetadata<HoodieData<WriteStatus>> result;
if (!taggedValidRecords.isEmpty()) {
- result = deleteExecutor.execute(taggedValidRecords);
+ result = deleteExecutor.execute(taggedValidRecords,
Option.of(sourceReadAndIndexTimer));
result.setIndexLookupDuration(tagLocationDuration);
} else {
// if entire set of keys are non existent
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
index ee5dba93b00..7da3c9479b7 100755
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metrics/TestHoodieMetrics.java
@@ -44,6 +44,7 @@ import java.util.UUID;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static
org.apache.hudi.metrics.HoodieMetrics.SOURCE_READ_AND_INDEX_ACTION;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -127,6 +128,14 @@ public class TestHoodieMetrics {
}
}
+ // PreWrite metrics
+ timer = hoodieMetrics.getSourceReadAndIndexTimerCtx();
+ Thread.sleep(5); // Ensure timer duration is > 0
+ hoodieMetrics.updateSourceReadAndIndexMetrics("some_action",
hoodieMetrics.getDurationInMs(timer.stop()));
+ metricName = hoodieMetrics.getMetricsName(SOURCE_READ_AND_INDEX_ACTION,
"some_action.duration");
+ msec = (Long)metrics.getRegistry().getGauges().get(metricName).getValue();
+ assertTrue(msec > 0);
+
// Rollback metrics
timer = hoodieMetrics.getRollbackCtx();
Thread.sleep(5); // Ensure timer duration is > 0
diff --git
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
index e18052f002a..e285657dead 100644
---
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
+++
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/action/commit/BaseFlinkCommitActionExecutor.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieNotSupportedException;
@@ -90,6 +91,11 @@ public abstract class BaseFlinkCommitActionExecutor<T>
extends
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
+ return execute(inputRecords, Option.empty());
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
List<WriteStatus> writeStatuses = new LinkedList<>();
diff --git
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
index 2a9c2b86024..3d503239c33 100644
---
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
+++
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/table/action/commit/BaseJavaCommitActionExecutor.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.CommitUtils;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
@@ -86,12 +87,16 @@ public abstract class BaseJavaCommitActionExecutor<T>
extends
@Override
public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords) {
+ return execute(inputRecords, Option.empty());
+ }
+
+ @Override
+ public HoodieWriteMetadata<List<WriteStatus>> execute(List<HoodieRecord<T>>
inputRecords, Option<HoodieTimer> sourceReadAndIndexTimer) {
HoodieWriteMetadata<List<WriteStatus>> result = new
HoodieWriteMetadata<>();
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecords),
table.getIndex().canIndexLogFiles());
LOG.info("Input workload profile :" + workloadProfile);
-
final Partitioner partitioner = getPartitioner(workloadProfile);
try {
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
index 98f1469e5ee..183d97f8ce7 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java
@@ -148,13 +148,18 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
@Override
public HoodieWriteMetadata<HoodieData<WriteStatus>>
execute(HoodieData<HoodieRecord<T>> inputRecords) {
+ return this.execute(inputRecords, Option.empty());
+ }
+
+ @Override
+ public HoodieWriteMetadata<HoodieData<WriteStatus>>
execute(HoodieData<HoodieRecord<T>> inputRecords, Option<HoodieTimer>
sourceReadAndIndexTimer) {
// Cache the tagged records, so we don't end up computing both
JavaRDD<HoodieRecord<T>> inputRDD = HoodieJavaRDD.getJavaRDD(inputRecords);
if (inputRDD.getStorageLevel() == StorageLevel.NONE()) {
HoodieJavaRDD.of(inputRDD).persist(config.getTaggedRecordStorageLevel(),
context, HoodieDataCacheKey.of(config.getBasePath(), instantTime));
} else {
- LOG.info("RDD PreppedRecords was persisted at: " +
inputRDD.getStorageLevel());
+ LOG.info("RDD PreppedRecords was persisted at: {}",
inputRDD.getStorageLevel());
}
// Handle records update with clustering
@@ -162,13 +167,14 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
LOG.info("Num spark partitions for inputRecords before triggering workload
profile {}", inputRecordsWithClusteringUpdate.getNumPartitions());
context.setJobStatus(this.getClass().getSimpleName(), "Building workload
profile:" + config.getTableName());
- HoodieTimer sourceReadAndIndexTimer = HoodieTimer.start(); // time taken
from dedup -> tag location -> building workload profile
WorkloadProfile workloadProfile =
new WorkloadProfile(buildProfile(inputRecordsWithClusteringUpdate),
operationType, table.getIndex().canIndexLogFiles());
- LOG.debug("Input workload profile :" + workloadProfile);
- long sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.endTimer();
- LOG.info("Source read and index timer " + sourceReadAndIndexDurationMs);
-
+ LOG.debug("Input workload profile :{}", workloadProfile);
+ Long sourceReadAndIndexDurationMs = null;
+ if (sourceReadAndIndexTimer.isPresent()) {
+ sourceReadAndIndexDurationMs = sourceReadAndIndexTimer.get().endTimer();
+ LOG.info("Source read and index timer {}", sourceReadAndIndexDurationMs);
+ }
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(workloadProfile);
saveWorkloadProfileMetadataToInflight(workloadProfile, instantTime);
@@ -177,7 +183,9 @@ public abstract class BaseSparkCommitActionExecutor<T>
extends
HoodieData<WriteStatus> writeStatuses =
mapPartitionsAsRDD(inputRecordsWithClusteringUpdate, partitioner);
HoodieWriteMetadata<HoodieData<WriteStatus>> result = new
HoodieWriteMetadata<>();
updateIndexAndCommitIfNeeded(writeStatuses, result);
- result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
+ if (sourceReadAndIndexTimer.isPresent()) {
+ result.setSourceReadAndIndexDurationMs(sourceReadAndIndexDurationMs);
+ }
return result;
}