This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 92fc0c09192 [HUDI-7165][FOLLOW-UP] Add test case for stopping 
heartbeat for un-committed events (#10230)
92fc0c09192 is described below

commit 92fc0c0919278b6e43a7c45b92c80be7a39525ec
Author: ksmou <[email protected]>
AuthorDate: Tue Dec 5 10:29:29 2023 +0800

    [HUDI-7165][FOLLOW-UP] Add test case for stopping heartbeat for 
un-committed events (#10230)
---
 .../sink/TestStreamWriteOperatorCoordinator.java   | 38 ++++++++++++++++++++++
 1 file changed, 38 insertions(+)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0f3d1947128..5cbe9899b8d 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -19,7 +19,9 @@
 package org.apache.hudi.sink;
 
 import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -65,7 +67,9 @@ import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.CoreMatchers.startsWith;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -185,6 +189,40 @@ public class TestStreamWriteOperatorCoordinator {
     assertThat("Recommits the instant with partial uncommitted events", 
lastCompleted, is(instant));
   }
 
+  @Test
+  public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws 
Exception {
+    // reset
+    reset();
+    // override the default configuration
+    Configuration conf = 
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+    conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), 
HoodieFailedWritesCleaningPolicy.LAZY.name());
+    OperatorCoordinator.Context context = new 
MockOperatorCoordinatorContext(new OperatorID(), 1);
+    coordinator = new StreamWriteOperatorCoordinator(conf, context);
+    coordinator.start();
+    coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+    
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
+
+    final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
+
+    // start one instant and not commit it
+    coordinator.handleEventFromOperator(0, event0);
+    String instant = coordinator.getInstant();
+    HoodieHeartbeatClient heartbeatClient = 
coordinator.getWriteClient().getHeartbeatClient();
+    assertNotNull(heartbeatClient.getHeartbeat(instant), "Heartbeat is 
missing");
+
+    String basePath = tempFile.getAbsolutePath();
+    HoodieWrapperFileSystem fs = 
coordinator.getWriteClient().getHoodieTable().getMetaClient().getFs();
+
+    assertTrue(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), 
"Heartbeat is existed");
+
+    // send bootstrap event to stop the heartbeat for this instant
+    WriteMetadataEvent event1 = WriteMetadataEvent.emptyBootstrap(0);
+    coordinator.handleEventFromOperator(0, event1);
+
+    assertFalse(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant), 
"Heartbeat is stopped and cleared");
+  }
+
   @Test
   public void testRecommitWithLazyFailedWritesCleanPolicy() {
     
coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY,
 HoodieFailedWritesCleaningPolicy.LAZY.name());

Reply via email to