This is an automated email from the ASF dual-hosted git repository.

leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 6ed0fca1520 [HUDI-6501] HoodieHeartbeatClient should stop all 
heartbeats and not delete heartbeat files for close (#9160)
6ed0fca1520 is described below

commit 6ed0fca1520b9b6be9d98b7a03177a58ca799296
Author: Nicholas Jiang <[email protected]>
AuthorDate: Thu Jul 20 22:15:27 2023 +0800

    [HUDI-6501] HoodieHeartbeatClient should stop all heartbeats and not delete 
heartbeat files for close (#9160)
---
 .../org/apache/hudi/client/BaseHoodieClient.java   |  2 +-
 .../client/heartbeat/HoodieHeartbeatClient.java    | 76 ++++++++++++----------
 .../heartbeat/TestHoodieHeartbeatClient.java       | 10 +++
 .../hudi/testutils/HoodieClientTestBase.java       |  1 +
 4 files changed, 52 insertions(+), 37 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
index 26b10c1c1bf..ed5b71d96b1 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieClient.java
@@ -107,7 +107,7 @@ public abstract class BaseHoodieClient implements 
Serializable, AutoCloseable {
   public void close() {
     stopEmbeddedServerView(true);
     this.context.setJobStatus("", "");
-    this.heartbeatClient.stop();
+    this.heartbeatClient.close();
     this.txnManager.close();
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
index 0bc4aacf178..76bdbc46174 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
@@ -23,7 +23,6 @@ import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.HoodieHeartbeatException;
 
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.slf4j.Logger;
@@ -34,14 +33,10 @@ import javax.annotation.concurrent.NotThreadSafe;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Timer;
 import java.util.TimerTask;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hudi.common.heartbeat.HoodieHeartbeatUtils.getLastHeartbeatTime;
 
@@ -58,12 +53,11 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   private final transient FileSystem fs;
   private final String basePath;
   // path to the heartbeat folder where all writers are updating their 
heartbeats
-  private String heartbeatFolderPath;
+  private final String heartbeatFolderPath;
   // heartbeat interval in millis
   private final Long heartbeatIntervalInMs;
-  private Integer numTolerableHeartbeatMisses;
   private final Long maxAllowableHeartbeatIntervalInMs;
-  private Map<String, Heartbeat> instantToHeartbeatMap;
+  private final Map<String, Heartbeat> instantToHeartbeatMap;
 
   public HoodieHeartbeatClient(FileSystem fs, String basePath, Long 
heartbeatIntervalInMs,
                                Integer numTolerableHeartbeatMisses) {
@@ -72,12 +66,11 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
     this.basePath = basePath;
     this.heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
     this.heartbeatIntervalInMs = heartbeatIntervalInMs;
-    this.numTolerableHeartbeatMisses = numTolerableHeartbeatMisses;
-    this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * 
this.numTolerableHeartbeatMisses;
+    this.maxAllowableHeartbeatIntervalInMs = this.heartbeatIntervalInMs * 
numTolerableHeartbeatMisses;
     this.instantToHeartbeatMap = new HashMap<>();
   }
 
-  class Heartbeat {
+  static class Heartbeat {
 
     private String instantTime;
     private Boolean isHeartbeatStarted = false;
@@ -163,7 +156,8 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
 
   /**
    * Start a new heartbeat for the specified instant. If there is already one 
running, this will be a NO_OP
-   * @param instantTime
+   *
+   * @param instantTime The instant time for the heartbeat.
    */
   public void start(String instantTime) {
     LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
@@ -185,36 +179,55 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
   }
 
   /**
-   * Stops the heartbeat for the specified instant.
-   * @param instantTime
+   * Stops the heartbeat and deletes the heartbeat file for the specified 
instant.
+   *
+   * @param instantTime The instant time for the heartbeat.
    * @throws HoodieException
    */
   public void stop(String instantTime) throws HoodieException {
     Heartbeat heartbeat = instantToHeartbeatMap.get(instantTime);
-    if (heartbeat != null && heartbeat.isHeartbeatStarted() && 
!heartbeat.isHeartbeatStopped()) {
-      LOG.info("Stopping heartbeat for instant " + instantTime);
-      heartbeat.getTimer().cancel();
-      heartbeat.setHeartbeatStopped(true);
-      LOG.info("Stopped heartbeat for instant " + instantTime);
+    if (isHeartbeatStarted(heartbeat)) {
+      stopHeartbeatTimer(heartbeat);
       HeartbeatUtils.deleteHeartbeatFile(fs, basePath, instantTime);
       LOG.info("Deleted heartbeat file for instant " + instantTime);
     }
   }
 
   /**
-   * Stops all heartbeats started via this instance of the client.
+   * Stops all timers of heartbeats started via this instance of the client.
+   *
    * @throws HoodieException
    */
-  public void stop() throws HoodieException {
-    instantToHeartbeatMap.values().forEach(heartbeat -> 
stop(heartbeat.getInstantTime()));
+  public void stopHeartbeatTimers() throws HoodieException {
+    
instantToHeartbeatMap.values().stream().filter(this::isHeartbeatStarted).forEach(this::stopHeartbeatTimer);
+  }
+
+  /**
+   * Whether the given heartbeat is started.
+   *
+   * @param heartbeat The heartbeat to check whether is started.
+   * @return Whether the heartbeat is started.
+   * @throws IOException
+   */
+  private boolean isHeartbeatStarted(Heartbeat heartbeat) {
+    return heartbeat != null && heartbeat.isHeartbeatStarted() && 
!heartbeat.isHeartbeatStopped();
+  }
+
+  /**
+   * Stops the timer of the given heartbeat.
+   *
+   * @param heartbeat The heartbeat to stop.
+   */
+  private void stopHeartbeatTimer(Heartbeat heartbeat) {
+    LOG.info("Stopping heartbeat for instant " + heartbeat.getInstantTime());
+    heartbeat.getTimer().cancel();
+    heartbeat.setHeartbeatStopped(true);
+    LOG.info("Stopped heartbeat for instant " + heartbeat.getInstantTime());
   }
 
   public static Boolean heartbeatExists(FileSystem fs, String basePath, String 
instantTime) throws IOException {
     Path heartbeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + 
instantTime);
-    if (fs.exists(heartbeatFilePath)) {
-      return true;
-    }
-    return false;
+    return fs.exists(heartbeatFilePath);
   }
 
   public boolean isHeartbeatExpired(String instantTime) throws IOException {
@@ -236,15 +249,6 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
     return false;
   }
 
-  public List<String> getAllExistingHeartbeatInstants() throws IOException {
-    Path heartbeatFolder = new Path(heartbeatFolderPath);
-    if (this.fs.exists(heartbeatFolder)) {
-      FileStatus[] fileStatus = this.fs.listStatus(new 
Path(heartbeatFolderPath));
-      return Arrays.stream(fileStatus).map(fs -> 
fs.getPath().getName()).collect(Collectors.toList());
-    }
-    return Collections.EMPTY_LIST;
-  }
-
   private void updateHeartbeat(String instantTime) throws 
HoodieHeartbeatException {
     try {
       Long newHeartbeatTime = System.currentTimeMillis();
@@ -276,7 +280,7 @@ public class HoodieHeartbeatClient implements 
AutoCloseable, Serializable {
 
   @Override
   public void close() {
-    this.stop();
+    this.stopHeartbeatTimers();
     this.instantToHeartbeatMap.clear();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
index 88fe28edb4e..a877d6bfc23 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
@@ -90,4 +90,14 @@ public class TestHoodieHeartbeatClient extends 
HoodieCommonTestHarness {
     hoodieHeartbeatClient.stop(instantTime1);
     assertFalse(HeartbeatUtils.deleteHeartbeatFile(metaClient.getFs(), 
basePath, instantTime2));
   }
+
+  @Test
+  public void testStopHeartbeatTimers() throws IOException {
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval, numTolerableMisses);
+    hoodieHeartbeatClient.start(instantTime1);
+    hoodieHeartbeatClient.stopHeartbeatTimers();
+    assertFalse(hoodieHeartbeatClient.isHeartbeatExpired(instantTime1));
+    
assertTrue(hoodieHeartbeatClient.getHeartbeat(instantTime1).isHeartbeatStopped());
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
index 1642715851e..454236b4278 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestBase.java
@@ -770,6 +770,7 @@ public class HoodieClientTestBase extends 
HoodieClientTestHarness {
     HoodieWriteConfig hoodieWriteConfig = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY)
         .withAutoCommit(false) // disable auto commit
         .withRollbackUsingMarkers(true)
+        .withHeartbeatTolerableMisses(0)
         .build();
 
     try (SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig)) 
{

Reply via email to