This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 33f43091a84 [HUDI-7381] Fix compaction write stats and metrics for 
create and upsert time (#10619)
33f43091a84 is described below

commit 33f43091a840f445e9ab50bbdaccfa3b91db9fdb
Author: Rajesh Mahindra <[email protected]>
AuthorDate: Tue Feb 13 15:20:06 2024 -0800

    [HUDI-7381] Fix compaction write stats and metrics for create and upsert 
time (#10619)
    
    Co-authored-by: rmahindra123 <[email protected]>
---
 .../hudi/table/action/compact/HoodieCompactor.java | 26 ++++++++++++++--------
 .../table/action/compact/TestHoodieCompactor.java  | 12 +++++++++-
 .../apache/hudi/common/model/HoodieWriteStat.java  | 12 +++-------
 3 files changed, 31 insertions(+), 19 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
index d1d69be16dc..940ab9886c3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.engine.TaskContextSupplier;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.CompactionOperation;
 import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.HoodieWriteStat.RuntimeStats;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -239,18 +240,25 @@ public abstract class HoodieCompactor<T, I, K, O> 
implements Serializable {
     scanner.close();
     Iterable<List<WriteStatus>> resultIterable = () -> result;
     return StreamSupport.stream(resultIterable.spliterator(), 
false).flatMap(Collection::stream).peek(s -> {
-      
s.getStat().setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
-      s.getStat().setTotalLogFilesCompacted(scanner.getTotalLogFiles());
-      s.getStat().setTotalLogRecords(scanner.getTotalLogRecords());
-      s.getStat().setPartitionPath(operation.getPartitionPath());
-      s.getStat()
+      final HoodieWriteStat stat = s.getStat();
+      stat.setTotalUpdatedRecordsCompacted(scanner.getNumMergedRecordsInLog());
+      stat.setTotalLogFilesCompacted(scanner.getTotalLogFiles());
+      stat.setTotalLogRecords(scanner.getTotalLogRecords());
+      stat.setPartitionPath(operation.getPartitionPath());
+      stat
           
.setTotalLogSizeCompacted(operation.getMetrics().get(CompactionStrategy.TOTAL_LOG_FILE_SIZE).longValue());
-      s.getStat().setTotalLogBlocks(scanner.getTotalLogBlocks());
-      s.getStat().setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
-      s.getStat().setTotalRollbackBlocks(scanner.getTotalRollbacks());
+      stat.setTotalLogBlocks(scanner.getTotalLogBlocks());
+      stat.setTotalCorruptLogBlock(scanner.getTotalCorruptBlocks());
+      stat.setTotalRollbackBlocks(scanner.getTotalRollbacks());
       RuntimeStats runtimeStats = new RuntimeStats();
+      // scan time has to be obtained from scanner.
       
runtimeStats.setTotalScanTime(scanner.getTotalTimeTakenToReadAndMergeBlocks());
-      s.getStat().setRuntimeStats(runtimeStats);
+      // create and upsert time are obtained from the create or merge handle.
+      if (stat.getRuntimeStats() != null) {
+        
runtimeStats.setTotalCreateTime(stat.getRuntimeStats().getTotalCreateTime());
+        
runtimeStats.setTotalUpsertTime(stat.getRuntimeStats().getTotalUpsertTime());
+      }
+      stat.setRuntimeStats(runtimeStats);
     }).collect(toList());
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index 7fd0f40f2ba..313f14ce989 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -28,6 +28,7 @@ import org.apache.hudi.common.data.HoodieListData;
 import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieInstant.State;
@@ -63,6 +64,7 @@ import java.util.stream.Collectors;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -298,9 +300,17 @@ public class TestHoodieCompactor extends 
HoodieSparkClientTestHarness {
    * Verify that all partition paths are present in the WriteStatus result.
    */
   private void verifyCompaction(HoodieData<WriteStatus> result) {
+    List<WriteStatus> writeStatuses = result.collectAsList();
     for (String partitionPath : dataGen.getPartitionPaths()) {
-      List<WriteStatus> writeStatuses = result.collectAsList();
       assertTrue(writeStatuses.stream().anyMatch(writeStatus -> 
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath)));
     }
+
+    writeStatuses.forEach(writeStatus -> {
+      final HoodieWriteStat.RuntimeStats stats = 
writeStatus.getStat().getRuntimeStats();
+      assertNotNull(stats);
+      assertEquals(stats.getTotalCreateTime(), 0);
+      assertTrue(stats.getTotalUpsertTime() > 0);
+      assertTrue(stats.getTotalScanTime() > 0);
+    });
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
index 095c1b38387..59da7ed7f49 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/model/HoodieWriteStat.java
@@ -412,45 +412,39 @@ public class HoodieWriteStat implements Serializable {
     /**
      * Total time taken to read and merge logblocks in a log file.
      */
-    @Nullable
     private long totalScanTime;
 
     /**
      * Total time taken by a Hoodie Merge for an existing file.
      */
-    @Nullable
     private long totalUpsertTime;
 
     /**
      * Total time taken by a Hoodie Insert to a file.
      */
-    @Nullable
     private long totalCreateTime;
 
-    @Nullable
     public long getTotalScanTime() {
       return totalScanTime;
     }
 
-    public void setTotalScanTime(@Nullable long totalScanTime) {
+    public void setTotalScanTime(long totalScanTime) {
       this.totalScanTime = totalScanTime;
     }
 
-    @Nullable
     public long getTotalUpsertTime() {
       return totalUpsertTime;
     }
 
-    public void setTotalUpsertTime(@Nullable long totalUpsertTime) {
+    public void setTotalUpsertTime(long totalUpsertTime) {
       this.totalUpsertTime = totalUpsertTime;
     }
 
-    @Nullable
     public long getTotalCreateTime() {
       return totalCreateTime;
     }
 
-    public void setTotalCreateTime(@Nullable long totalCreateTime) {
+    public void setTotalCreateTime(long totalCreateTime) {
       this.totalCreateTime = totalCreateTime;
     }
   }

Reply via email to