nsivabalan commented on a change in pull request #2359:
URL: https://github.com/apache/hudi/pull/2359#discussion_r577633594



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -48,6 +49,7 @@
   protected final transient Configuration hadoopConf;
   protected final HoodieWriteConfig config;
   protected final String basePath;
+  protected HoodieHeartbeatClient heartbeatClient;

Review comment:
       why not final ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -181,7 +171,7 @@ public boolean commitStats(String instantTime, 
List<HoodieWriteStat> stats, Opti
     HoodieCommitMetadata metadata = CommitUtils.buildMetadata(stats, 
partitionToReplaceFileIds, extraMetadata, operationType, config.getSchema(), 
commitActionType);
     // Finalize write
     finalizeWrite(table, instantTime, stats);
-
+    HeartbeatUtils.abortIfHeartbeatExpired(instantTime, table, 
heartbeatClient, config);

Review comment:
       @vinothchandar @n3nash : what's a good programming practice here or is 
it more of developers choice. 
   Here I see two approaches
   Approach1: // what's in this patch. 
   ```
   HeartbeatUtils.abortIfHeartbea.... // this will throw exception if need to 
be aborted.
   activeTimeline.saveAsComplete.... // so the assumption is that if no 
exception is thrown, heart beat is successful sort of 
   proceed further...
   ```
   
   Approach 2: // make HeartbeatUtils.abortIfHeart.... return a boolean whether 
abort or not. 
   ```
   if(HeartbeatUtils.hasHeartBeatExpired ....)
       // abort steps
       // may be throw an exception here
   else 
      activeTimeline.saveAsComplete
      proceed further 
   ```
   2nd approach is more explicit. do you have any good suggestions on this. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -236,17 +226,16 @@ protected void syncTableMetadata() {
    * Main API to run bootstrap to hudi.
    */
   public void bootstrap(Option<Map<String, String>> extraMetadata) {
-    if (rollbackPending) {
-      rollBackInflightBootstrap();
-    }
+    // TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned 
later
     HoodieTable<T, I, K, O> table = 
getTableAndInitCtx(WriteOperationType.UPSERT, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    rollbackFailedBootstrap();
     table.bootstrap(context, extraMetadata);
   }
 
   /**
-   * Main API to rollback pending bootstrap.
+   * Main API to rollback failed bootstrap.
    */
-  protected void rollBackInflightBootstrap() {
+  public void rollbackFailedBootstrap() {

Review comment:
       why required to be public? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -236,17 +226,16 @@ protected void syncTableMetadata() {
    * Main API to run bootstrap to hudi.
    */
   public void bootstrap(Option<Map<String, String>> extraMetadata) {
-    if (rollbackPending) {
-      rollBackInflightBootstrap();
-    }
+    // TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned 
later
     HoodieTable<T, I, K, O> table = 
getTableAndInitCtx(WriteOperationType.UPSERT, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    rollbackFailedBootstrap();

Review comment:
       if someone has set rollbackPending = false, prior to this patch, we will 
not rollback and now we are rolling back right. So is that intentional ? or its 
a TODO to be fixed in a later patch?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -617,11 +610,8 @@ public HoodieCleanMetadata clean() {
    * Provides a new commit time for a write operation (insert/update/delete).
    */
   public String startCommit() {
-    // NOTE : Need to ensure that rollback is done before a new commit is 
started
-    if (rollbackPending) {
-      // Only rollback pending commit/delta-commits. Do not touch compaction 
commits
-      rollbackPendingCommits();
-    }
+    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());

Review comment:
       not part of this patch. but just curious. Why this method(startCommit) 
in turn does not call into startCommitWithTime() ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -424,15 +411,15 @@ protected void postCommit(HoodieTable<T, I, K, O> table, 
HoodieCommitMetadata me
       HoodieTimelineArchiveLog archiveLog = new 
HoodieTimelineArchiveLog(config, table);
       archiveLog.archiveIfRequired(context);
       autoCleanOnCommit();
-
       syncTableMetadata();
+      this.heartbeatClient.stop(instantTime);

Review comment:
       should we do this within finally? or is it intentional to do it here. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -598,6 +588,9 @@ public HoodieRestoreMetadata restoreToInstant(final String 
instantTime) throws H
   public HoodieCleanMetadata clean(String cleanInstantTime) throws 
HoodieIOException {
     LOG.info("Cleaner started");
     final Timer.Context timerContext = metrics.getCleanCtx();
+    LOG.info("Cleaned failed attempts if any");
+    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),

Review comment:
       Can you confirm that when some customer moves from eager to lazy in 
successive launches, this should handle it. ie. lazy code path will clean any 
leftovers (if any), if in previous launch partial files were cleaned up 
eagerly, but before cleaning up fully, the process crashed. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with 
expired heartbeats.
+ */
+public class HeartbeatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HeartbeatUtils.class);
+
+  /**
+   * Deletes the heartbeat file for the specified instant.
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   */
+  public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, 
String instantTime) {
+    boolean deleted = false;
+    try {
+      String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+      deleted = fs.delete(new Path(heartbeatFolderPath + File.separator + 
instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);

Review comment:
       is this referring to this case.
   deleting an already deleted or non existent heartbeatFile? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -912,5 +929,6 @@ public void close() {
     // Calling this here releases any resources used by your index, so make 
sure to finish any related operations
     // before this point
     this.index.close();
+    this.heartbeatClient.stop();

Review comment:
       do we need to move this to finally block ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieHeartbeatException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the heartbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartbeatFolderPath;
+  // heartbeat interval in millis
+  private final Long heartbeatIntervalInMs;
+  private Integer numTolerableHeartbeatMisses;
+  private final Long maxAllowableHeartbeatIntervalInMs;

Review comment:
       minor. may be we could name "heartBeatTimeoutInMs". feel free to take a 
call. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with 
expired heartbeats.
+ */
+public class HeartbeatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HeartbeatUtils.class);
+
+  /**
+   * Deletes the heartbeat file for the specified instant.
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   */
+  public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, 
String instantTime) {
+    boolean deleted = false;
+    try {
+      String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+      deleted = fs.delete(new Path(heartbeatFolderPath + File.separator + 
instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+    } catch (IOException io) {
+      LOG.error("Unable to delete heartbeat for instant " + instantTime, io);
+    }
+    return deleted;
+  }
+
+  /**
+   * Deletes the heartbeat files for instants with expired heartbeats without 
any active instant.
+   * @param existingHeartbeatInstants
+   * @param metaClient
+   * @param basePath
+   */
+  public static void cleanExpiredHeartbeats(List<String> 
existingHeartbeatInstants,
+                                            HoodieTableMetaClient metaClient, 
String basePath) {
+    Set<String> heartbeatInstants = metaClient.getActiveTimeline()
+        
.filterCompletedInstants().getInstants().map(HoodieInstant::getTimestamp).collect(Collectors.toSet());
+    existingHeartbeatInstants.stream().forEach(instant -> {
+      if (!heartbeatInstants.contains(instant)) {
+        deleteHeartbeatFile(metaClient.getFs(), basePath, instant);
+      }
+    });
+  }
+
+  public static void abortIfHeartbeatExpired(String instantTime, HoodieTable 
table,

Review comment:
       java docs

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieHeartbeatException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.stream.Collectors;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the heartbeat folder where all writers are updating their 
heartbeats

Review comment:
       guess you need to move this comment to 2 lines below

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/clean/SparkCleanActionExecutor.java
##########
@@ -131,4 +131,5 @@ public SparkCleanActionExecutor(HoodieSparkEngineContext 
context,
           .build();
     }).collect(Collectors.toList());
   }
+

Review comment:
       can we revert this.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -70,6 +72,8 @@ protected AbstractHoodieClient(HoodieEngineContext context, 
HoodieWriteConfig cl
     this.config = clientConfig;
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
+    this.heartbeatClient = new HoodieHeartbeatClient(this.fs, this.basePath,
+        clientConfig.getHoodieClientHeartbeatIntervalInMs(), 
clientConfig.getHoodieClientHeartbeatTolerableMisses());

Review comment:
       +1

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
##########
@@ -112,4 +114,25 @@ public static HoodieCleanerPlan 
getCleanerPlan(HoodieTableMetaClient metaClient,
   public static List<HoodieCleanFileInfo> 
convertToHoodieCleanFileInfoList(List<CleanFileInfo> cleanFileInfoList) {
     return 
cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList());
   }
+
+  public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, String actionType,

Review comment:
       java docs

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
##########
@@ -112,4 +114,25 @@ public static HoodieCleanerPlan 
getCleanerPlan(HoodieTableMetaClient metaClient,
   public static List<HoodieCleanFileInfo> 
convertToHoodieCleanFileInfoList(List<CleanFileInfo> cleanFileInfoList) {
     return 
cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList());
   }
+
+  public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, String actionType,
+                                          Functions.Function0<Boolean> 
rollbackFailedWritesFunc) {
+    switch (actionType) {
+      case HoodieTimeline.CLEAN_ACTION:
+        if (cleaningPolicy.isEager()) {
+          // No need to do any special cleanup for failed operations during 
clean, compact etc
+          return;
+        }
+        else if (cleaningPolicy.isLazy()) {
+          rollbackFailedWritesFunc.apply();
+        }
+        break;
+      default:

Review comment:
       why keeping this loose. why not just do it only for COMMIT_ACTION only. 
I see callers only for these 2 action types. For any other action types, 
shouldn't we throw or leave it a no-op ?  

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/CleanerUtils.java
##########
@@ -112,4 +114,25 @@ public static HoodieCleanerPlan 
getCleanerPlan(HoodieTableMetaClient metaClient,
   public static List<HoodieCleanFileInfo> 
convertToHoodieCleanFileInfoList(List<CleanFileInfo> cleanFileInfoList) {
     return 
cleanFileInfoList.stream().map(CleanFileInfo::toHoodieFileCleanInfo).collect(Collectors.toList());
   }
+
+  public static void rollbackFailedWrites(HoodieFailedWritesCleaningPolicy 
cleaningPolicy, String actionType,
+                                          Functions.Function0<Boolean> 
rollbackFailedWritesFunc) {
+    switch (actionType) {
+      case HoodieTimeline.CLEAN_ACTION:
+        if (cleaningPolicy.isEager()) {
+          // No need to do any special cleanup for failed operations during 
clean, compact etc
+          return;
+        }
+        else if (cleaningPolicy.isLazy()) {
+          rollbackFailedWritesFunc.apply();
+        }
+        break;
+      default:

Review comment:
       basically, why we execute lines 131 to 135 for all action types except 
CLEAN_ACTION

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -48,6 +49,7 @@
   protected final transient Configuration hadoopConf;
   protected final HoodieWriteConfig config;
   protected final String basePath;
+  protected HoodieHeartbeatClient heartbeatClient;

Review comment:
       I see you have a public getter. so, wondering why have it as protected 
here? bcoz, inherited classes could re-initialize? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -617,11 +610,8 @@ public HoodieCleanMetadata clean() {
    * Provides a new commit time for a write operation (insert/update/delete).
    */
   public String startCommit() {
-    // NOTE : Need to ensure that rollback is done before a new commit is 
started
-    if (rollbackPending) {
-      // Only rollback pending commit/delta-commits. Do not touch compaction 
commits
-      rollbackPendingCommits();
-    }
+    CleanerUtils.rollbackFailedWrites(config.getFailedWritesCleanPolicy(),
+        HoodieTimeline.COMMIT_ACTION, () -> rollbackFailedWrites());

Review comment:
       if we do that, we don't need to call CleanerUtils.rollbackFailedWrites 
at two diff places(613, 643). Guess you might have ran into some issues. 
Interested to understand the reasoning. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with 
expired heartbeats.
+ */
+public class HeartbeatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HeartbeatUtils.class);
+
+  /**
+   * Deletes the heartbeat file for the specified instant.
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   */
+  public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, 
String instantTime) {
+    boolean deleted = false;
+    try {
+      String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+      deleted = fs.delete(new Path(heartbeatFolderPath + File.separator + 
instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+    } catch (IOException io) {
+      LOG.error("Unable to delete heartbeat for instant " + instantTime, io);

Review comment:
       sorry, shouldn't we throw here? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -749,22 +739,49 @@ private HoodieTimeline 
getInflightTimelineExcludeCompactionAndClustering(HoodieT
   }
 
   /**
-   * Cleanup all pending commits.
+   * Rollback all failed writes.
    */
-  private void rollbackPendingCommits() {
+  public Boolean rollbackFailedWrites() {

Review comment:
       do we make use of the return value somewhere? also, I don't see we 
return false anywhere. exceptions will be thrown anyways. so not sure if the 
return value is of much value. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HeartbeatUtils.java
##########
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Helper class to delete heartbeat for completed or failed instants with 
expired heartbeats.
+ */
+public class HeartbeatUtils {
+
+  private static final Logger LOG = LogManager.getLogger(HeartbeatUtils.class);
+
+  /**
+   * Deletes the heartbeat file for the specified instant.
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   */
+  public static boolean deleteHeartbeatFile(FileSystem fs, String basePath, 
String instantTime) {
+    boolean deleted = false;
+    try {
+      String heartbeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+      deleted = fs.delete(new Path(heartbeatFolderPath + File.separator + 
instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+    } catch (IOException io) {
+      LOG.error("Unable to delete heartbeat for instant " + instantTime, io);
+    }
+    return deleted;
+  }
+
+  /**
+   * Deletes the heartbeat files for instants with expired heartbeats without 
any active instant.
+   * @param existingHeartbeatInstants
+   * @param metaClient
+   * @param basePath
+   */
+  public static void cleanExpiredHeartbeats(List<String> 
existingHeartbeatInstants,
+                                            HoodieTableMetaClient metaClient, 
String basePath) {
+    Set<String> heartbeatInstants = metaClient.getActiveTimeline()

Review comment:
       minor: activeHeartbeatInstants

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -749,22 +739,49 @@ private HoodieTimeline 
getInflightTimelineExcludeCompactionAndClustering(HoodieT
   }
 
   /**
-   * Cleanup all pending commits.
+   * Rollback all failed writes.
    */
-  private void rollbackPendingCommits() {
+  public Boolean rollbackFailedWrites() {
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    HoodieTimeline inflightTimeline = 
getInflightTimelineExcludeCompactionAndClustering(table);
-    List<String> commits = 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
-        .collect(Collectors.toList());
-    for (String commit : commits) {
-      if (HoodieTimeline.compareTimestamps(commit, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+    List<String> instantsToRollback = getInstantsToRollback(table);
+    for (String instant : instantsToRollback) {
+      if (HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
-        rollBackInflightBootstrap();
+        rollbackFailedBootstrap();
         break;
       } else {
-        rollback(commit);
+        rollback(instant);
       }
     }
+    // Delete any heartbeat files for already rolled back commits
+    try {
+      
HeartbeatUtils.cleanExpiredHeartbeats(this.heartbeatClient.getExistingHeartbeatInstantFileNames(),
+          createMetaClient(true), basePath);
+    } catch (IOException io) {
+      LOG.error("Unable to delete heartbeat files", io);
+    }
+    return true;
+  }
+
+  private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
+    if (config.getFailedWritesCleanPolicy().isEager()) {

Review comment:
       can we do switch case instead of multiple if else.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to