This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c9a1ddc1b2 [HUDI-9303] Fix timer is not reset properly after 
RowDataLogWriteHandle flushing a data block (#13132)
7c9a1ddc1b2 is described below

commit 7c9a1ddc1b2b39b5a3a634691a83054b1b876f0b
Author: Shuo Cheng <[email protected]>
AuthorDate: Fri Apr 11 10:43:34 2025 +0800

    [HUDI-9303] Fix timer is not reset properly after RowDataLogWriteHandle 
flushing a data block (#13132)
---
 .../org/apache/hudi/io/v2/RowDataLogWriteHandle.java |  1 +
 .../org/apache/hudi/sink/TestWriteMergeOnRead.java   | 20 ++++++++++++++++++++
 2 files changed, 21 insertions(+)

diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
index 281b197171b..f89169e75fd 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/io/v2/RowDataLogWriteHandle.java
@@ -149,6 +149,7 @@ public class RowDataLogWriteHandle<T, I, K, O>
     assert stat.getRuntimeStats() != null;
     LOG.info("WriteHandle for partitionPath {} filePath {}, took {} ms.",
         partitionPath, stat.getPath(), 
stat.getRuntimeStats().getTotalUpsertTime());
+    timer.startTimer();
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
index 94214661ea8..ce0581f3300 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestWriteMergeOnRead.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.sink;
 
+import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.EventTimeAvroPayload;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.config.HoodieClusteringConfig;
@@ -201,6 +202,25 @@ public class TestWriteMergeOnRead extends 
TestWriteCopyOnWrite {
         .end();
   }
 
+  @Test
+  void testWriteMorWithSmallLogBlock() throws Exception {
+    // total 5 records, average records size is 48,
+    // set max block size as 128 to trigger a flush during write log data 
blocks
+    conf.setString(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE.key(), 
"128");
+
+    Map<String, String> expected = new HashMap<>();
+    expected.put("par1", "[id1,par1,id1,Danny,23,4,par1]");
+
+    preparePipeline()
+        .consume(TestData.DATA_SET_INSERT_SAME_KEY)
+        .assertEmptyDataFiles()
+        .checkpoint(1)
+        .assertNextEvent()
+        .checkpointComplete(1)
+        .checkWrittenData(expected, 1)
+        .end();
+  }
+
   @Override
   protected Map<String, String> getExpectedBeforeCheckpointComplete() {
     return EXPECTED1;

Reply via email to