This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 33f43091a84 [HUDI-7381] Fix compaction write stats and metrics for
create and upsert time (#10619)
33f43091a84 is described below
commit 33f43091a840f445e9ab50bbdaccfa3b91db9fdb
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Feb 13 15:20:06 2024 -0800
[HUDI-7381] Fix compaction write stats and metrics for create and upsert
time (#10619)
Co-authored-by: rmahindra123 <[email protected]>
---
.../hudi/table/action/compact/HoodieCompactor.java | 26 ++++++++++++++--------
.../table/action/compact/TestHoodieCompactor.java | 12 +++++++++-
.../apache/hudi/common/model/HoodieWriteStat.java | 12 +++-------
3 files changed, 31 insertions(+), 19 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index d1d69be16dc..940ab9886c3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -239,18 +240,25 @@ public abstract class HoodieCompactor<T, I, K, O>
implements Serializable {
scanner.close();
Iterable<List<WriteStatus>> resultIterable = () -> result;
return StreamSupport.stream(resultIterable.spliterator(),
false).flatMap(Collection::stream).peek(s -> {
-
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
- s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
- s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
- s.getStat().setPartitionPath(operation.getPartitionPath());
- s.getStat()
+ final HoodieWriteStat stat = s.getStat();
+ stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+ stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+ stat.setTotalLogRecords(scanner.getTotalLogRecords());
+ stat.setPartitionPath(operation.getPartitionPath());
+ stat
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
- s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
- s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
- s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
+ stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
+ stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+ stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
RuntimeStats runtimeStats = new RuntimeStats();
+ // scan time has to be obtained from scanner.
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
- s.getStat().setRuntimeStats(runtimeStats);
+ // create and upsert time are obtained from the create or merge handle.
+ if (stat.getRuntimeStats() != null) {
+
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
+
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
+ }
+ stat.setRuntimeStats(runtimeStats);
}).collect(toList());
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 7fd0f40f2ba..313f14ce989 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.data.HoodieListData;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -298,9 +300,17 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
* Verify that all partition paths are present in the WriteStatus result.
*/
private void verifyCompaction(HoodieData<WriteStatus> result) {
+ List<WriteStatus> writeStatuses = result.collectAsList();
for (String partitionPath : dataGen.getPartitionPaths()) {
- List<WriteStatus> writeStatuses = result.collectAsList();
assertTrue(writeStatuses.stream().anyMatch(writeStatus ->
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
}
+
+ writeStatuses.forEach(writeStatus -> {
+ final HoodieWriteStat.RuntimeStats stats =
writeStatus.getStat().getRuntimeStats();
+ assertNotNull(stats);
+ assertEquals(stats.getTotalCreateTime(), 0);
+ assertTrue(stats.getTotalUpsertTime() > 0);
+ assertTrue(stats.getTotalScanTime() > 0);
+ });
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 095c1b38387..59da7ed7f49 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -412,45 +412,39 @@ public class HoodieWriteStat implements Serializable {
/**
* Total time taken to read and merge logblocks in a log file.
*/
- @Nullable
private long totalScanTime;
/**
* Total time taken by a Hoodie Merge for an existing file.
*/
- @Nullable
private long totalUpsertTime;
/**
* Total time taken by a Hoodie Insert to a file.
*/
- @Nullable
private long totalCreateTime;
- @Nullable
public long getTotalScanTime() {
return totalScanTime;
}
- public void setTotalScanTime(@Nullable long totalScanTime) {
+ public void setTotalScanTime(long totalScanTime) {
this.totalScanTime = totalScanTime;
}
- @Nullable
public long getTotalUpsertTime() {
return totalUpsertTime;
}
- public void setTotalUpsertTime(@Nullable long totalUpsertTime) {
+ public void setTotalUpsertTime(long totalUpsertTime) {
this.totalUpsertTime = totalUpsertTime;
}
- @Nullable
public long getTotalCreateTime() {
return totalCreateTime;
}
- public void setTotalCreateTime(@Nullable long totalCreateTime) {
+ public void setTotalCreateTime(long totalCreateTime) {
this.totalCreateTime = totalCreateTime;
}
}