vinothchandar commented on a change in pull request #2359:
URL: https://github.com/apache/hudi/pull/2359#discussion_r549432592



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieClient.java
##########
@@ -70,6 +72,7 @@ protected AbstractHoodieClient(HoodieEngineContext context, 
HoodieWriteConfig cl
     this.config = clientConfig;
     this.timelineServer = timelineServer;
     shouldStopTimelineServer = !timelineServer.isPresent();
+    this.hoodieHeartbeatClient = new HoodieHeartbeatClient(this.fs, 
this.basePath, clientConfig.getHoodieClientHeartbeatIntervalInSecs());

Review comment:
       nit: just `heartbeatClient` ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -160,7 +180,9 @@ public boolean commit(String instantTime, O writeStatuses) {
   public boolean commit(String instantTime, O writeStatuses, 
Option<Map<String, String>> extraMetadata) {
     HoodieTableMetaClient metaClient = createMetaClient(false);
     String actionType = metaClient.getCommitActionType();
-    return commit(instantTime, writeStatuses, extraMetadata, actionType, 
Collections.emptyMap());
+    boolean isCommitted = commit(instantTime, writeStatuses, extraMetadata, 
actionType, Collections.emptyMap());
+    this.hoodieHeartbeatClient.stop(instantTime);

Review comment:
       can we do this in postCommit(), so all these things are in one place.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -875,5 +924,6 @@ public void close() {
     // Calling this here releases any resources used by your index, so make 
sure to finish any related operations
     // before this point
     this.index.close();
+    this.hoodieHeartbeatClient.stop();

Review comment:
       rename: `.close()` or `shutDown()` to differentiate from the other usage

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -712,24 +742,43 @@ public void rollbackInflightCompaction(HoodieInstant 
inflightInstant, HoodieTabl
   }
 
   /**
-   * Cleanup all pending commits.
+   * Rollback all failed commits.
    */
-  private void rollbackPendingCommits() {
+  public void rollbackFailedCommits() {
     HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
-    HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
-    List<String> commits = 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
-        .collect(Collectors.toList());
-    for (String commit : commits) {
-      if (HoodieTimeline.compareTimestamps(commit, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
+    List<String> instantsToRollback = getInstantsToRollback(table);
+    for (String instant : instantsToRollback) {
+      if (HoodieTimeline.compareTimestamps(instant, 
HoodieTimeline.LESSER_THAN_OR_EQUALS,
           HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS)) {
-        rollBackInflightBootstrap();
+        rollbackFailedBootstrap();
         break;
       } else {
-        rollback(commit);
+        rollback(instant);
       }
     }
   }
 
+  private List<String> getInstantsToRollback(HoodieTable<T, I, K, O> table) {
+    if (config.getFailedWritesCleanPolicy() == 
HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.EAGER) {
+      HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
+      return 
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp)
+          .collect(Collectors.toList());
+    } else if (config.getFailedWritesCleanPolicy() == 
HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.LAZY) {
+      return table.getMetaClient().getActiveTimeline()
+          
.getCommitsTimeline().filterInflights().getReverseOrderedInstants().filter(instant
 -> {
+            try {
+              return 
!hoodieHeartbeatClient.checkIfConcurrentWriterRunning(instant.getTimestamp());
+            } catch (IOException io) {
+              throw new HoodieException("Failed to check heartbeat for instant 
" + instant, io);
+            }
+          }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+    } else if (config.getFailedWritesCleanPolicy() == 
HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.NEVER) {
+      return new ArrayList<>();

Review comment:
       nit. Collections.EMPTY_LIST? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {
+      while (!shutdownRequested) {
+        try {
+          Thread.sleep(this.heartbeatIntervalInMillis);
+          updateHeartbeat(instantTime);
+        } catch (InterruptedIOException | InterruptedException ioe) {
+          LOG.warn("Thread controlling heartbeat was interrupted");
+        } catch (IOException ioe) {
+          LOG.error("Unable to create heartbeat file", ioe);
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    this.isHearbeatStarted = true;

Review comment:
       then all we need to is to cancel the timers 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -615,6 +640,11 @@ public String startCommit() {
    * @param instantTime Instant time to be generated
    */
   public void startCommitWithTime(String instantTime) {
+    if (config.getFailedWritesCleanPolicy() == 
HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.LAZY) {

Review comment:
       this sort of block is repeated in many places. lets see if we can invest 
in some helpers?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {
+      while (!shutdownRequested) {
+        try {
+          Thread.sleep(this.heartbeatIntervalInMillis);
+          updateHeartbeat(instantTime);
+        } catch (InterruptedIOException | InterruptedException ioe) {
+          LOG.warn("Thread controlling heartbeat was interrupted");
+        } catch (IOException ioe) {
+          LOG.error("Unable to create heartbeat file", ioe);
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    this.isHearbeatStarted = true;
+    this.isHeatBeatStopped = false;
+  }
+
+  public void stop(String instantTime) throws HoodieException {
+    this.shutdownRequested = true;
+    if (isHearbeatStarted && !isHeatBeatStopped && this.heartBeatFuture != 
null) {
+      // TODO : Substract the amount of time it takes to execute 
updateHeartbeat from the sleep time to avoid race conditions
+      boolean isCancelled = this.heartBeatFuture.cancel(true);
+      if (!isCancelled) {
+        throw new HoodieException("Could not stop heartbeat client");
+      }
+      this.isHeatBeatStopped = true;
+      LOG.info("Stopping heartbeat for instant " + instantTime);
+    }
+  }
+
+  public void stop() throws HoodieException {
+    if (this.currentInstantTime != null) {
+      stop(this.currentInstantTime);
+    }
+  }
+
+  public boolean delete(String instantTime) throws IOException {
+    // TODO ensure that we are not deleting the current instant heartbeat, 
this might need storing the instant time
+    // and implementing another heartbeat utils class for everything else
+    if (this.currentInstantTime == instantTime && !this.isHeatBeatStopped) {
+      LOG.error("Cannot delete a currently running heartbeat, stop it first");
+      return false;
+    } else {
+      boolean deleted = this.fs.delete(new Path(heartBeatFolderPath + 
File.separator + instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+      return deleted;
+    }
+  }
+
+  public static Long getLastHeartbeat(FileSystem fs, String basePath, String 
instantTime) throws IOException {
+    Path heartBeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + 
instantTime);
+    if (fs.exists(heartBeatFilePath)) {
+      return fs.getFileStatus(heartBeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public boolean checkIfConcurrentWriterRunning(String instantTime) throws 
IOException {
+    long lastHeartBeatForWriter = getLastHeartbeat(fs, basePath, instantTime);
+    long currentTime = System.currentTimeMillis();
+    if (currentTime - lastHeartBeatForWriter > this.heartbeatIntervalInMillis) 
{

Review comment:
       So the heartbeats won't align correctly with the interval. Typically we 
add another config that multiplies with `this.heartbeatIntervalInMillis` , 
which should at least be 2. I am saying we should compared for `currentTime - 
lastHeartBeatForWriter > this.numTolerableHeartbeatMisses * 
this.heartbeatIntervalInMillis`.
   
   `numTolerableHeartbeatMisses` should have another config, with default value 
2 
   
   

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1367,6 +1410,77 @@ public void 
testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableO
     testRollbackAfterConsistencyCheckFailureUsingFileList(true, 
enableOptimisticConsistencyGuard);
   }
 
+  @Test
+  public void testParallelWriting() throws Exception {
+    ExecutorService service = Executors.newFixedThreadPool(3);
+    String instantTime1 = "100";
+    String instantTime2 = "200";
+    String instantTime3 = "300";
+    Boolean doCommit = true;
+    Boolean doClean = false;
+    HoodieTestUtils.init(hadoopConf, basePath);
+    Future<Boolean> commit1 = service.submit(() -> 
doWritesToTable(instantTime1, doCommit, doClean));
+    Future<Boolean> commit2 = service.submit(() -> 
doWritesToTable(instantTime2, doCommit, doClean));
+    Future<Boolean> commit3 = service.submit(() -> 
doWritesToTable(instantTime3, doCommit, doClean));
+    assertTrue(commit1.get() && commit2.get() && commit3.get());
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    
assertTrue(metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().countInstants()
 == 3);
+  }
+
+  @Test
+  public void testParallelWritingWhileCleaningPreviousFailedCommits() throws 
Exception {
+    ExecutorService service = Executors.newFixedThreadPool(2);
+    String instantTime1 = "100";
+    String instantTime2 = "200";
+    String instantTime3 = "300";
+    String instantTime4 = "400";
+    Boolean doCommit = false;
+    Boolean doClean = false;
+    HoodieTestUtils.init(hadoopConf, basePath);
+    // Perform 2 failed writes to table
+    doWritesToTable(instantTime1, doCommit, doClean);
+    doWritesToTable(instantTime2, doCommit, doClean);
+    Future<Boolean> commit3 = service.submit(() -> 
doWritesToTable(instantTime3, true, false));
+    assertTrue(commit3.get());
+    HoodieTableMetaClient metaClient = new HoodieTableMetaClient(hadoopConf, 
basePath);
+    assertTrue(metaClient.getActiveTimeline().getTimelineOfActions(
+        
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 
0);
+    
assertTrue(metaClient.getActiveTimeline().filterInflights().countInstants() == 
2);
+    // Sleep for 5 secs to allow for heart beat interval to pass
+    Thread.sleep(5000);
+    SparkRDDWriteClient client = new SparkRDDWriteClient(context, 
getParallelWritingWriteConfig());
+    Future<Boolean> commit4 = service.submit(() -> 
doWritesToTable(instantTime4, true, false));
+    Future<HoodieCleanMetadata> clean1 = service.submit(() -> client.clean());
+    assertTrue(commit4.get());
+    clean1.get();
+    assertTrue(metaClient.getActiveTimeline().reload().getTimelineOfActions(
+        
CollectionUtils.createSet(HoodieTimeline.ROLLBACK_ACTION)).countInstants() == 
2);
+  }
+
+  private boolean doWritesToTable(String instantTime, Boolean doCommit, 
Boolean doClean) {
+    HoodieWriteConfig cfg = getParallelWritingWriteConfig();
+    // Create a new write client local to thread
+    SparkRDDWriteClient client = new SparkRDDWriteClient(context, cfg);
+
+    client.startCommitWithTime(instantTime);
+
+    List<HoodieRecord> records0 = dataGen.generateInserts(instantTime, 200);
+    JavaRDD<HoodieRecord> writeRecords0 = jsc.parallelize(records0, 1);
+    JavaRDD<WriteStatus> result0 = client.bulkInsert(writeRecords0, 
instantTime);

Review comment:
       should we upsert of the writes. so we get a better mix.

##########
File path: 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHeartbeatClient extends HoodieCommonTestHarness {
+
+  private static String instantTime1 = "100";
+  private static String instantTime2 = "101";
+
+  @BeforeEach
+  public void init() throws IOException {
+    initMetaClient();
+  }
+
+  @Test
+  public void testStartHeartbeat() throws IOException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    FileStatus [] fs = metaClient.getFs().listStatus(new 
Path(hoodieHeartbeatClient.getHeartBeatFolderPath()));
+    assertTrue(fs.length == 1);
+    assertTrue(fs[0].getPath().toString().contains(instantTime1));
+  }
+
+  @Test
+  public void testStopHeartbeat() throws IOException, InterruptedException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    hoodieHeartbeatClient.stop(instantTime1);
+    Long waitTimeInMs = 2000L;
+    Thread.sleep(waitTimeInMs);
+    assertTrue(hoodieHeartbeatClient.getNumHeartBeatsForCurrentInstantTime() > 
0);
+    Long lastHeartBeat = 
HoodieHeartbeatClient.getLastHeartbeat(metaClient.getFs(), basePath, 
instantTime1);
+    assertTrue(System.currentTimeMillis() - lastHeartBeat >= waitTimeInMs);
+  }
+
+  @Test
+  public void testNumHeartbeatsGenerated() throws InterruptedException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start("100");
+    // wait 10 heartbeats
+    Thread.sleep(10 * heartBeatInterval * 1000);

Review comment:
       No `Thread.sleep` in tests please. these make tests flaky. lets 
structure them, to wait till a condition X is reached with a timeout. essence 
is explained here. http://www.awaitility.org/

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/MultiWriterConcurrencyModel.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+public enum MultiWriterConcurrencyModel {
+  LEGACY_SINGLE_WRITER,
+  EMBARRASSINGLY_PARALLEL,

Review comment:
       rename: just `PARALLEL_NO_CONTENTION` 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -232,17 +254,18 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
    * Main API to run bootstrap to hudi.
    */
   public void bootstrap(Option<Map<String, String>> extraMetadata) {
-    if (rollbackPending) {
-      rollBackInflightBootstrap();
-    }
+    // TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned 
later

Review comment:
       bootstrap reattempt will do this IIRC

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
##########
@@ -140,6 +144,10 @@
   public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = 
AVRO_SCHEMA + ".externalTransformation";
   public static final String DEFAULT_EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION 
= "false";
 
+  public static final String MULTI_WRITER_CONCURRENCY_MODEL_PROP =

Review comment:
       rename :  `WRITER_CONCURRENCY_MODE`, instead of model. its more 
understandable for users. MULTI seems redundant

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -712,24 +742,43 @@ public void rollbackInflightCompaction(HoodieInstant 
inflightInstant, HoodieTabl
   }
 
   /**
-   * Cleanup all pending commits.
+   * Rollback all failed commits.
    */
-  private void rollbackPendingCommits() {
+  public void rollbackFailedCommits() {

Review comment:
       why is this public now? 

##########
File path: 
hudi-cli/src/main/java/org/apache/hudi/cli/commands/FileSystemViewCommand.java
##########
@@ -249,7 +249,7 @@ private HoodieTableFileSystemView 
buildFileSystemView(String globRegex, String m
     } else if (excludeCompaction) {
       timeline = metaClient.getActiveTimeline().getCommitsTimeline();
     } else {
-      timeline = 
metaClient.getActiveTimeline().getCommitsAndCompactionTimeline();
+      timeline = 
metaClient.getActiveTimeline().getCommitsCompactionAndReplaceTimeline();

Review comment:
       these seem like fixes we should make regardless? cc @satishkotha  ? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;

Review comment:
       why not just keep the config at ms level. I think thats better to do . 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieCompactionConfig.java
##########
@@ -39,6 +39,7 @@
 public class HoodieCompactionConfig extends DefaultHoodieConfig {
 
   public static final String CLEANER_POLICY_PROP = "hoodie.cleaner.policy";
+  public static final String FAILED_WRITES_CLEANER_POLICY_PROP = 
"hoodie.failed.writes.cleaner.policy";

Review comment:
       please move the property closer to the default values

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;

Review comment:
       this is always true?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {
+      while (!shutdownRequested) {
+        try {
+          Thread.sleep(this.heartbeatIntervalInMillis);
+          updateHeartbeat(instantTime);
+        } catch (InterruptedIOException | InterruptedException ioe) {
+          LOG.warn("Thread controlling heartbeat was interrupted");
+        } catch (IOException ioe) {
+          LOG.error("Unable to create heartbeat file", ioe);
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    this.isHearbeatStarted = true;
+    this.isHeatBeatStopped = false;
+  }
+
+  public void stop(String instantTime) throws HoodieException {
+    this.shutdownRequested = true;
+    if (isHearbeatStarted && !isHeatBeatStopped && this.heartBeatFuture != 
null) {
+      // TODO : Substract the amount of time it takes to execute 
updateHeartbeat from the sleep time to avoid race conditions
+      boolean isCancelled = this.heartBeatFuture.cancel(true);
+      if (!isCancelled) {
+        throw new HoodieException("Could not stop heartbeat client");
+      }
+      this.isHeatBeatStopped = true;
+      LOG.info("Stopping heartbeat for instant " + instantTime);
+    }
+  }
+
+  public void stop() throws HoodieException {
+    if (this.currentInstantTime != null) {
+      stop(this.currentInstantTime);
+    }
+  }
+
+  public boolean delete(String instantTime) throws IOException {
+    // TODO ensure that we are not deleting the current instant heartbeat, 
this might need storing the instant time
+    // and implementing another heartbeat utils class for everything else
+    if (this.currentInstantTime == instantTime && !this.isHeatBeatStopped) {
+      LOG.error("Cannot delete a currently running heartbeat, stop it first");
+      return false;
+    } else {
+      boolean deleted = this.fs.delete(new Path(heartBeatFolderPath + 
File.separator + instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+      return deleted;
+    }
+  }
+
+  public static Long getLastHeartbeat(FileSystem fs, String basePath, String 
instantTime) throws IOException {
+    Path heartBeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + 
instantTime);
+    if (fs.exists(heartBeatFilePath)) {
+      return fs.getFileStatus(heartBeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public boolean checkIfConcurrentWriterRunning(String instantTime) throws 
IOException {
+    long lastHeartBeatForWriter = getLastHeartbeat(fs, basePath, instantTime);
+    long currentTime = System.currentTimeMillis();
+    if (currentTime - lastHeartBeatForWriter > this.heartbeatIntervalInMillis) 
{

Review comment:
       `return this.heartbeatIntervalInMillis <= currentTime - 
lastHeartBeatForWriter` ? single line

##########
File path: 
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/heartbeat/TestHoodieHeartbeatClient.java
##########
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestHoodieHeartbeatClient extends HoodieCommonTestHarness {
+
+  private static String instantTime1 = "100";
+  private static String instantTime2 = "101";
+
+  @BeforeEach
+  public void init() throws IOException {
+    initMetaClient();
+  }
+
+  @Test
+  public void testStartHeartbeat() throws IOException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    FileStatus [] fs = metaClient.getFs().listStatus(new 
Path(hoodieHeartbeatClient.getHeartBeatFolderPath()));
+    assertTrue(fs.length == 1);
+    assertTrue(fs[0].getPath().toString().contains(instantTime1));
+  }
+
+  @Test
+  public void testStopHeartbeat() throws IOException, InterruptedException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    hoodieHeartbeatClient.stop(instantTime1);
+    Long waitTimeInMs = 2000L;
+    Thread.sleep(waitTimeInMs);
+    assertTrue(hoodieHeartbeatClient.getNumHeartBeatsForCurrentInstantTime() > 
0);
+    Long lastHeartBeat = 
HoodieHeartbeatClient.getLastHeartbeat(metaClient.getFs(), basePath, 
instantTime1);
+    assertTrue(System.currentTimeMillis() - lastHeartBeat >= waitTimeInMs);
+  }
+
+  @Test
+  public void testNumHeartbeatsGenerated() throws InterruptedException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start("100");
+    // wait 10 heartbeats
+    Thread.sleep(10 * heartBeatInterval * 1000);
+    assertTrue(hoodieHeartbeatClient.getNumHeartBeatsForCurrentInstantTime() 
>= 10);
+  }
+
+  @Test
+  public void testLastHeartBeatTime() throws IOException, InterruptedException 
{
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start("100");
+    // wait 10 heartbeats
+    Thread.sleep(1 * heartBeatInterval * 1000);
+    Long startHeartbeat = 
hoodieHeartbeatClient.getLastHeartbeat(metaClient.getFs(), basePath, "100");
+    Thread.sleep(9 * heartBeatInterval * 1000);
+    hoodieHeartbeatClient.stop("100");
+    Long endHeartbeat = 
HoodieHeartbeatClient.getLastHeartbeat(metaClient.getFs(), basePath, "100");
+    // check if the heartbeat time is greater than start time + 10 heartbeat 
intervals
+    assertTrue((endHeartbeat - startHeartbeat) >= (9 * heartBeatInterval * 
1000));
+  }
+
+  @Test
+  public void testDeleteWrongHeartbeat() throws IOException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    hoodieHeartbeatClient.stop(instantTime1);
+    assertFalse(hoodieHeartbeatClient.delete(instantTime2));
+  }
+
+  @Test
+  public void testDeleteRunningHeartbeat() throws IOException {
+    int heartBeatInterval = 1;
+    HoodieHeartbeatClient hoodieHeartbeatClient =
+        new HoodieHeartbeatClient(metaClient.getFs(), 
metaClient.getBasePath(), heartBeatInterval);
+    hoodieHeartbeatClient.start(instantTime1);
+    assertFalse(hoodieHeartbeatClient.delete(instantTime1));
+  }
+
+  @Test
+  public void testConcurrentReadsAndWritesToSameFile() throws 
InterruptedException {
+    Thread th1 = new Thread(() -> {
+      try {
+        while (true) {
+          OutputStream outputStream = metaClient.getFs().create(new 
Path(metaClient.getBasePath() + "/test"), true);
+          outputStream.write("testing".getBytes());
+          outputStream.close();
+        }
+      } catch (Exception e) {
+        System.out.println("Caught Exception th1");
+        e.printStackTrace();
+      }
+    });
+
+    Thread th2 = new Thread(() -> {
+      try {
+        while (true) {
+          FileStatus fs = metaClient.getFs().getFileStatus(new 
Path(metaClient.getBasePath() + "/test"));
+          System.out.println("Modification Time => " + 
fs.getModificationTime());
+        }
+      } catch (Exception e) {
+        System.out.println("Caught Exception th2");

Review comment:
       log? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;

Review comment:
       so this is assuming the single write client won't have multiple writers. 
i.e will issue `start(instantTime)` for only one instant at any time. Can we 
use a Map per instantTime and maintain state such that the heartbeatClient can 
handle multiple instant times at once

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {

Review comment:
       can we use a TimerTask or TimerService. it can also support multiple of 
these schedules

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/MultiWriterConcurrencyModel.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+public enum MultiWriterConcurrencyModel {

Review comment:
       rename:WriteConcurrencyMode

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieCleaningPolicy.java
##########
@@ -22,5 +22,8 @@
  * Hoodie cleaning policies.
  */
 public enum HoodieCleaningPolicy {
-  KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS
+  KEEP_LATEST_FILE_VERSIONS, KEEP_LATEST_COMMITS;
+  public enum HoodieFailedWritesCleaningPolicy {

Review comment:
       let's have as a top level enum/ unnested

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {
+      while (!shutdownRequested) {
+        try {
+          Thread.sleep(this.heartbeatIntervalInMillis);
+          updateHeartbeat(instantTime);
+        } catch (InterruptedIOException | InterruptedException ioe) {
+          LOG.warn("Thread controlling heartbeat was interrupted");
+        } catch (IOException ioe) {
+          LOG.error("Unable to create heartbeat file", ioe);
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    this.isHearbeatStarted = true;
+    this.isHeatBeatStopped = false;
+  }
+
+  public void stop(String instantTime) throws HoodieException {
+    this.shutdownRequested = true;
+    if (isHearbeatStarted && !isHeatBeatStopped && this.heartBeatFuture != 
null) {
+      // TODO : Substract the amount of time it takes to execute 
updateHeartbeat from the sleep time to avoid race conditions
+      boolean isCancelled = this.heartBeatFuture.cancel(true);
+      if (!isCancelled) {
+        throw new HoodieException("Could not stop heartbeat client");
+      }
+      this.isHeatBeatStopped = true;
+      LOG.info("Stopping heartbeat for instant " + instantTime);
+    }
+  }
+
+  public void stop() throws HoodieException {
+    if (this.currentInstantTime != null) {
+      stop(this.currentInstantTime);
+    }
+  }
+
+  public boolean delete(String instantTime) throws IOException {
+    // TODO ensure that we are not deleting the current instant heartbeat, 
this might need storing the instant time
+    // and implementing another heartbeat utils class for everything else
+    if (this.currentInstantTime == instantTime && !this.isHeatBeatStopped) {
+      LOG.error("Cannot delete a currently running heartbeat, stop it first");
+      return false;
+    } else {
+      boolean deleted = this.fs.delete(new Path(heartBeatFolderPath + 
File.separator + instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+      return deleted;
+    }
+  }
+
+  public static Long getLastHeartbeat(FileSystem fs, String basePath, String 
instantTime) throws IOException {
+    Path heartBeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + 
instantTime);
+    if (fs.exists(heartBeatFilePath)) {
+      return fs.getFileStatus(heartBeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public boolean checkIfConcurrentWriterRunning(String instantTime) throws 
IOException {
+    long lastHeartBeatForWriter = getLastHeartbeat(fs, basePath, instantTime);
+    long currentTime = System.currentTimeMillis();
+    if (currentTime - lastHeartBeatForWriter > this.heartbeatIntervalInMillis) 
{
+      return false;
+    }
+    return true;
+  }
+
+  private void updateHeartbeat(String instantTime) throws IOException {
+    Long newHeartBeatTime = System.currentTimeMillis();
+    OutputStream outputStream =

Review comment:
       AFAIK, a new file creation should be atomic in both HDFS and cloud 
stores. but unsure how the overwrite part works. 
   
   Can you clarify? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -107,7 +107,7 @@ public HoodieTimeline 
filterCompletedAndCompactionInstants() {
   }
 
   @Override
-  public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {
+  public HoodieDefaultTimeline getCommitsCompactionAndReplaceTimeline() {

Review comment:
       got it. so this is a rename. discard my earlier comment. 

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -1367,6 +1410,77 @@ public void 
testRollbackAfterConsistencyCheckFailureUsingMarkers(boolean enableO
     testRollbackAfterConsistencyCheckFailureUsingFileList(true, 
enableOptimisticConsistencyGuard);
   }
 
+  @Test
+  public void testParallelWriting() throws Exception {
+    ExecutorService service = Executors.newFixedThreadPool(3);
+    String instantTime1 = "100";

Review comment:
       lets inline these variables . they can all be just constants, right

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
##########
@@ -145,6 +146,11 @@ public void scan() {
           // hit a block with instant time greater than should be processed, 
stop processing further
           break;
         }
+        if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != 
COMMAND_BLOCK
+            && 
!timeline.containsOrBeforeTimelineStarts(r.getLogBlockHeader().get(INSTANT_TIME)))
 {
+          // hit an invalid block possibly from a failed write, move to the 
next one and skip processing this one

Review comment:
       this is not an invalid block right. this skipping an uncommitted block? 

##########
File path: 
hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/TestHoodieROTablePathFilter.java
##########
@@ -84,7 +84,6 @@ public void testNonHoodiePaths() throws IOException {
     assertTrue(pathFilter.accept(new Path(path2.toUri())));
   }
 
-  @Test
   public void testPartitionPathsAsNonHoodiePaths() throws Exception {

Review comment:
       why removing the test?

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -437,7 +456,10 @@ private void testUpsertsInternal(HoodieWriteConfig config,
    */
   @Test
   public void testDeletes() throws Exception {
-    SparkRDDWriteClient client = getHoodieWriteClient(getConfig(), false);
+    // Set cleaner to LAZY so no inflights are cleaned

Review comment:
       why can't we override this once for all tests. if we run everything with 
`LAZY` then we can have better confidence that no uncommitted data will ever be 
exposed

##########
File path: 
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java
##########
@@ -546,7 +571,10 @@ private void testUpsertsUpdatePartitionPath(IndexType 
indexType, HoodieWriteConf
     HoodieTableMetaClient.initTableType(metaClient.getHadoopConf(), 
metaClient.getBasePath(),
         metaClient.getTableType(), metaClient.getTableConfig().getTableName(), 
metaClient.getArchivePath(),
         metaClient.getTableConfig().getPayloadClass(), VERSION_0);
-    SparkRDDWriteClient client = getHoodieWriteClient(hoodieWriteConfig, 
false);
+    // Set rollback to LAZY so no inflights are deleted
+    
hoodieWriteConfig.getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP,

Review comment:
       We need to do something about the repeated line
   
   
`getConfig().getProps().put(HoodieCompactionConfig.FAILED_WRITES_CLEANER_POLICY_PROP,
           HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.LAZY.name());`
   
   even if you want to override only selectively. Can we add an override for 
`getConfig(boolean)` which decides whether or not we do lazy cleaning of failed 
writes.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieDefaultTimeline.java
##########
@@ -107,7 +107,7 @@ public HoodieTimeline 
filterCompletedAndCompactionInstants() {
   }
 
   @Override
-  public HoodieDefaultTimeline getCommitsAndCompactionTimeline() {
+  public HoodieDefaultTimeline getCommitsCompactionAndReplaceTimeline() {

Review comment:
       how about `getWriteTimeline` simply

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -100,7 +107,9 @@ public static SparkConf registerClasses(SparkConf conf) {
   public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata,
                         String commitActionType, Map<String, List<String>> 
partitionToReplacedFileIds) {
     List<HoodieWriteStat> writeStats = 
writeStatuses.map(WriteStatus::getStat).collect();
-    return commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+    boolean isCommitted = commitStats(instantTime, writeStats, extraMetadata, 
commitActionType, partitionToReplacedFileIds);
+    this.hoodieHeartbeatClient.stop(instantTime);

Review comment:
       hmmm. we should probably do this only once at the super class level. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -232,17 +254,18 @@ void emitCommitMetrics(String instantTime, 
HoodieCommitMetadata metadata, String
    * Main API to run bootstrap to hudi.
    */
   public void bootstrap(Option<Map<String, String>> extraMetadata) {
-    if (rollbackPending) {
-      rollBackInflightBootstrap();
-    }
+    // TODO : MULTIWRITER -> check if failed bootstrap files can be cleaned 
later
     HoodieTable<T, I, K, O> table = 
getTableAndInitCtx(WriteOperationType.UPSERT, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    if (config.getFailedWritesCleanPolicy() == 
HoodieCleaningPolicy.HoodieFailedWritesCleaningPolicy.EAGER) {

Review comment:
       may be pull `HoodieFailedWritesCleaningPolicy` into its own file? 
shorter read everywhere.

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/FileCreateUtils.java
##########
@@ -83,6 +83,17 @@ private static void createMetaFile(String basePath, String 
instantTime, String s
     }
   }
 
+  private static void createMetaFile(FileSystem fs, String basePath, String 
instantTime, String suffix) throws IOException {

Review comment:
       dont we already have helpers that do these. cc @xushiyan 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/HoodieTableMetaClient.java
##########
@@ -217,6 +218,13 @@ public String getMetaAuxiliaryPath() {
     return basePath + File.separator + AUXILIARYFOLDER_NAME;
   }
 
+  /**
+   * @return Heartbeat folder path.
+   */
+  public static String getHeartbeatFolderPath(String basePath) {

Review comment:
       should nt this be under the metaPath? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/heartbeat/HoodieHeartbeatClient.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.heartbeat;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import javax.annotation.concurrent.NotThreadSafe;
+import java.io.File;
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.io.OutputStream;
+import java.io.Serializable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+
+/**
+ * This class creates heartbeat for hudi client. This heartbeat is used to 
ascertain whether the running job is or not.
+ * NOTE: Due to CPU contention on the driver/client node, the heartbeats could 
be delayed, hence it's important to set
+ *       the value high enough to avoid that possibility.
+ */
+@NotThreadSafe
+public class HoodieHeartbeatClient implements AutoCloseable, Serializable {
+
+  // TODO : Throw new exception from thread
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHeartbeatClient.class);
+
+  // path to the hearbeat folder where all writers are updating their 
heartbeats
+  private final transient FileSystem fs;
+  private final String basePath;
+  private String heartBeatFolderPath;
+  private String currentInstantTime;
+  private int numHeartBeatsForCurrentInstantTime;
+  // heartbeat interval in millis
+  private final long heartbeatIntervalInMillis;
+  private final transient ExecutorService executorService = 
Executors.newSingleThreadExecutor();
+  private volatile boolean shutdownRequested;
+  private boolean isHearbeatStarted = false;
+  private boolean isHeatBeatStopped = false;
+  private transient Future<?> heartBeatFuture;
+  private Long lastHeartBeatTime;
+  // This is required for testing. Problem : if we set 
heartbeatIntervalInSeconds really large, test takes longer, if
+  // we set it small and you are debugging with breakpoint, causes issues. 
Need to implement a Mock in tests
+  private final Boolean skipHeartBeatCheck;
+
+  public HoodieHeartbeatClient(FileSystem fs, String basePath, long 
heartbeatIntervalInSeconds) {
+    ValidationUtils.checkArgument(heartbeatIntervalInSeconds >= 1, "Cannot set 
heartbeat lower than 1 second");
+    this.fs = fs;
+    this.basePath = basePath;
+    this.heartBeatFolderPath = 
HoodieTableMetaClient.getHeartbeatFolderPath(basePath);
+    this.heartbeatIntervalInMillis = heartbeatIntervalInSeconds * 1000L;
+    this.skipHeartBeatCheck = true;
+  }
+
+  public void start(String instantTime) {
+    LOG.info("Received request to start heartbeat for instant time " + 
instantTime);
+    if (isHearbeatStarted && instantTime == currentInstantTime) {
+      // heartbeat already started, NO_OP
+      return;
+    } else if (instantTime != currentInstantTime) {
+      if (this.currentInstantTime != null) {
+        LOG.warn("Stopping heartbeat for previous instant time " + 
this.currentInstantTime);
+        this.stop(this.currentInstantTime);
+      }
+      this.currentInstantTime = instantTime;
+      this.numHeartBeatsForCurrentInstantTime = 0;
+      this.shutdownRequested = false;
+    }
+    try {
+      updateHeartbeat(instantTime);
+    } catch (Exception e) {
+      throw new RuntimeException("Unable to generate heartbeat");
+    }
+    this.heartBeatFuture = executorService.submit(() -> {
+      while (!shutdownRequested) {
+        try {
+          Thread.sleep(this.heartbeatIntervalInMillis);
+          updateHeartbeat(instantTime);
+        } catch (InterruptedIOException | InterruptedException ioe) {
+          LOG.warn("Thread controlling heartbeat was interrupted");
+        } catch (IOException ioe) {
+          LOG.error("Unable to create heartbeat file", ioe);
+          throw new RuntimeException(ioe);
+        }
+      }
+    });
+    this.isHearbeatStarted = true;
+    this.isHeatBeatStopped = false;
+  }
+
+  public void stop(String instantTime) throws HoodieException {
+    this.shutdownRequested = true;
+    if (isHearbeatStarted && !isHeatBeatStopped && this.heartBeatFuture != 
null) {
+      // TODO : Substract the amount of time it takes to execute 
updateHeartbeat from the sleep time to avoid race conditions
+      boolean isCancelled = this.heartBeatFuture.cancel(true);
+      if (!isCancelled) {
+        throw new HoodieException("Could not stop heartbeat client");
+      }
+      this.isHeatBeatStopped = true;
+      LOG.info("Stopping heartbeat for instant " + instantTime);
+    }
+  }
+
+  public void stop() throws HoodieException {
+    if (this.currentInstantTime != null) {
+      stop(this.currentInstantTime);
+    }
+  }
+
+  public boolean delete(String instantTime) throws IOException {
+    // TODO ensure that we are not deleting the current instant heartbeat, 
this might need storing the instant time
+    // and implementing another heartbeat utils class for everything else
+    if (this.currentInstantTime == instantTime && !this.isHeatBeatStopped) {
+      LOG.error("Cannot delete a currently running heartbeat, stop it first");
+      return false;
+    } else {
+      boolean deleted = this.fs.delete(new Path(heartBeatFolderPath + 
File.separator + instantTime), false);
+      if (!deleted) {
+        LOG.error("Failed to delete heartbeat for instant " + instantTime);
+      }
+      return deleted;
+    }
+  }
+
+  public static Long getLastHeartbeat(FileSystem fs, String basePath, String 
instantTime) throws IOException {
+    Path heartBeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + File.separator + 
instantTime);
+    if (fs.exists(heartBeatFilePath)) {
+      return fs.getFileStatus(heartBeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public boolean checkIfConcurrentWriterRunning(String instantTime) throws 
IOException {

Review comment:
       this is leaking upper level context (multi writing) to a lower level 
class. We should just name this something around heartbeats. say 
`isHeartbeatExpired()` 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/model/MultiWriterConcurrencyModel.java
##########
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.model;
+
+public enum MultiWriterConcurrencyModel {
+  LEGACY_SINGLE_WRITER,

Review comment:
       rename: SINGLE_WRITER, dropping the LEGACY. The model has its merits




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to