This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 92fc0c09192 [HUDI-7165][FOLLOW-UP] Add test case for stopping
heartbeat for un-committed events (#10230)
92fc0c09192 is described below
commit 92fc0c0919278b6e43a7c45b92c80be7a39525ec
Author: ksmou <[email protected]>
AuthorDate: Tue Dec 5 10:29:29 2023 +0800
[HUDI-7165][FOLLOW-UP] Add test case for stopping heartbeat for
un-committed events (#10230)
---
.../sink/TestStreamWriteOperatorCoordinator.java | 38 ++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
index 0f3d1947128..5cbe9899b8d 100644
---
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/TestStreamWriteOperatorCoordinator.java
@@ -19,7 +19,9 @@
package org.apache.hudi.sink;
import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteConcurrencyMode;
@@ -65,7 +67,9 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.startsWith;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -185,6 +189,40 @@ public class TestStreamWriteOperatorCoordinator {
assertThat("Recommits the instant with partial uncommitted events",
lastCompleted, is(instant));
}
+ @Test
+ public void testStopHeartbeatForUncommittedEventWithLazyCleanPolicy() throws
Exception {
+ // reset
+ reset();
+ // override the default configuration
+ Configuration conf =
TestConfigurations.getDefaultConf(tempFile.getAbsolutePath());
+ conf.setString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.LAZY.name());
+ OperatorCoordinator.Context context = new
MockOperatorCoordinatorContext(new OperatorID(), 1);
+ coordinator = new StreamWriteOperatorCoordinator(conf, context);
+ coordinator.start();
+ coordinator.setExecutor(new MockCoordinatorExecutor(context));
+
+
assertTrue(coordinator.getWriteClient().getConfig().getFailedWritesCleanPolicy().isLazy());
+
+ final WriteMetadataEvent event0 = WriteMetadataEvent.emptyBootstrap(0);
+
+ // start one instant and not commit it
+ coordinator.handleEventFromOperator(0, event0);
+ String instant = coordinator.getInstant();
+ HoodieHeartbeatClient heartbeatClient =
coordinator.getWriteClient().getHeartbeatClient();
+ assertNotNull(heartbeatClient.getHeartbeat(instant), "Heartbeat is
missing");
+
+ String basePath = tempFile.getAbsolutePath();
+ HoodieWrapperFileSystem fs =
coordinator.getWriteClient().getHoodieTable().getMetaClient().getFs();
+
+ assertTrue(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant),
"Heartbeat is existed");
+
+ // send bootstrap event to stop the heartbeat for this instant
+ WriteMetadataEvent event1 = WriteMetadataEvent.emptyBootstrap(0);
+ coordinator.handleEventFromOperator(0, event1);
+
+ assertFalse(HoodieHeartbeatClient.heartbeatExists(fs, basePath, instant),
"Heartbeat is stopped and cleared");
+ }
+
@Test
public void testRecommitWithLazyFailedWritesCleanPolicy() {
coordinator.getWriteClient().getConfig().setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY,
HoodieFailedWritesCleaningPolicy.LAZY.name());