This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6ed0fca1520 [HUDI-6501] HoodieHeartbeatClient should stop all
heartbeats and not delete heartbeat files for close (#9160)
6ed0fca1520 is described below
commit 6ed0fca1520b9b6be9d98b7a03177a58ca799296
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Jul 20 22:15:27 2023 +0800
[HUDI-6501] HoodieHeartbeatClient should stop all heartbeats and not delete
heartbeat files for close (#9160)
---
.../org/apache/hudi/client/BaseHoodieClient.java | 2 +-
.../client/heartbeat/HoodieHeartbeatClient.java | 76 ++++++++++++----------
.../heartbeat/TestHoodieHeartbeatClient.java | 10 +++
.../hudi/testutils/HoodieClientTestBase.java | 1 +
4 files changed, 52 insertions(+), 37 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 26b10c1c1bf..ed5b71d96b1 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -107,7 +107,7 @@ public abstract class BaseHoodieClient implements
Serializable, AutoCloseable {
public void close() {
stopEmbeddedServerView(true);
this.context.setJobStatus("", "");
- this.heartbeatClient.stop();
+ this.heartbeatClient.close();
this.txnManager.close();
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index 0bc4aacf178..76bdbc46174 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieHeartbeatException;
-import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
@@ -34,14 +33,10 @@ import javax.annotation.concurrent.NotThreadSafe;
import java.io.IOException;
import java.io.OutputStream;
import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.stream.Collectors;
import static
org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;
@@ -58,12 +53,11 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
private final transient FileSystem fs;
private final String basePath;
// path to the heartbeat folder where all writers are updating their
heartbeats
- private String heartbeatFolderPath;
+ private final String heartbeatFolderPath;
// heartbeat interval in millis
private final Long heartbeatIntervalInMs;
- private Integer numTolerableHeartbeatMisses;
private final Long maxAllowableHeartbeatIntervalInMs;
- private Map<String, Heartbeat> instantToHeartbeatMap;
+ private final Map<String, Heartbeat> instantToHeartbeatMap;
public HoodieHeartbeatClient(FileSystem fs, String basePath, Long
heartbeatIntervalInMs,
Integer numTolerableHeartbeatMisses) {
@@ -72,12 +66,11 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
this.basePath = basePath;
this.heartbeatFolderPath =
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
this.heartbeatIntervalInMs = heartbeatIntervalInMs;
- this.numTolerableHeartbeatMisses = numTolerableHeartbeatMisses;
- this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs *
this.numTolerableHeartbeatMisses;
+ this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs *
numTolerableHeartbeatMisses;
this.instantToHeartbeatMap = new HashMap<>();
}
- class Heartbeat {
+ static class Heartbeat {
private String instantTime;
private Boolean isHeartbeatStarted = false;
@@ -163,7 +156,8 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
/**
* Start a new heartbeat for the specified instant. If there is already one
running, this will be a NO_OP
- * @param instantTime
+ *
+ * @param instantTime The instant time for the heartbeat.
*/
public void start(String instantTime) {
LOG.info("Received request to start heartbeat for instant time " +
instantTime);
@@ -185,36 +179,55 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
}
/**
- * Stops the heartbeat for the specified instant.
- * @param instantTime
+ * Stops the heartbeat and deletes the heartbeat file for the specified
instant.
+ *
+ * @param instantTime The instant time for the heartbeat.
* @throws HoodieException
*/
public void stop(String instantTime) throws HoodieException {
Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
- if (heartbeat != null && heartbeat.isHeartbeatStarted() &&
!heartbeat.isHeartbeatStopped()) {
- LOG.info("Stopping heartbeat for instant " + instantTime);
- heartbeat.getTimer().cancel();
- heartbeat.setHeartbeatStopped(true);
- LOG.info("Stopped heartbeat for instant " + instantTime);
+ if (isHeartbeatStarted(heartbeat)) {
+ stopHeartbeatTimer(heartbeat);
HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instantTime);
LOG.info("Deleted heartbeat file for instant " + instantTime);
}
}
/**
- * Stops all heartbeats started via this instance of the client.
+ * Stops all timers of heartbeats started via this instance of the client.
+ *
* @throws HoodieException
*/
- public void stop() throws HoodieException {
- instantToHeartbeatMap.values().forEach(heartbeat ->
stop(heartbeat.getInstantTime()));
+ public void stopHeartbeatTimers() throws HoodieException {
+
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatTimer);
+ }
+
+ /**
+ * Whether the given heartbeat is started.
+ *
+ * @param heartbeat The heartbeat to check whether is started.
+ * @return Whether the heartbeat is started.
+ * @throws IOException
+ */
+ private boolean isHeartbeatStarted(Heartbeat heartbeat) {
+ return heartbeat != null && heartbeat.isHeartbeatStarted() &&
!heartbeat.isHeartbeatStopped();
+ }
+
+ /**
+ * Stops the timer of the given heartbeat.
+ *
+ * @param heartbeat The heartbeat to stop.
+ */
+ private void stopHeartbeatTimer(Heartbeat heartbeat) {
+ LOG.info("Stopping heartbeat for instant " + heartbeat.getInstantTime());
+ heartbeat.getTimer().cancel();
+ heartbeat.setHeartbeatStopped(true);
+ LOG.info("Stopped heartbeat for instant " + heartbeat.getInstantTime());
}
public static Boolean heartbeatExists(FileSystem fs, String basePath, String
instantTime) throws IOException {
Path heartbeatFilePath = new
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR +
instantTime);
- if (fs.exists(heartbeatFilePath)) {
- return true;
- }
- return false;
+ return fs.exists(heartbeatFilePath);
}
public boolean isHeartbeatExpired(String instantTime) throws IOException {
@@ -236,15 +249,6 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
return false;
}
- public List<String> getAllExistingHeartbeatInstants() throws IOException {
- Path heartbeatFolder = new Path(heartbeatFolderPath);
- if (this.fs.exists(heartbeatFolder)) {
- FileStatus[] fileStatus = this.fs.listStatus(new
Path(heartbeatFolderPath));
- return Arrays.stream(fileStatus).map(fs ->
fs.getPath().getName()).collect(Collectors.toList());
- }
- return Collections.EMPTY_LIST;
- }
-
private void updateHeartbeat(String instantTime) throws
HoodieHeartbeatException {
try {
Long newHeartbeatTime = System.currentTimeMillis();
@@ -276,7 +280,7 @@ public class HoodieHeartbeatClient implements
AutoCloseable, Serializable {
@Override
public void close() {
- this.stop();
+ this.stopHeartbeatTimers();
this.instantToHeartbeatMap.clear();
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
index 88fe28edb4e..a877d6bfc23 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
@@ -90,4 +90,14 @@ public class TestHoodieHeartbeatClient extends
HoodieCommonTestHarness {
hoodieHeartbeatClient.stop(instantTime1);
assertFalse(HeartbeatUtils.deleteHeartbeatFile(metaClient.getFs(),
basePath, instantTime2));
}
+
+ @Test
+ public void testStopHeartbeatTimers() throws IOException {
+ HoodieHeartbeatClient hoodieHeartbeatClient =
+ new HoodieHeartbeatClient(metaClient.getFs(),
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+ hoodieHeartbeatClient.start(instantTime1);
+ hoodieHeartbeatClient.stopHeartbeatTimers();
+ assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
+
assertTrue(hoodieHeartbeatClient.getHeartbeat(instantTime1).isHeartbeatStopped());
+ }
}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 1642715851e..454236b4278 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -770,6 +770,7 @@ public class HoodieClientTestBase extends
HoodieClientTestHarness {
HoodieWriteConfig hoodieWriteConfig =
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
.withAutoCommit(false) // disable auto commit
.withRollbackUsingMarkers(true)
+ .withHeartbeatTolerableMisses(0)
.build();
try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig))
{