This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 28cf81b2498 [HUDI-6371] Indexing catchup tasks should handle failed 
commits based on heartbeat (#12369)
28cf81b2498 is described below

commit 28cf81b24984aa4c77a441b1e35e57c8f670b85b
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Nov 30 03:39:38 2024 +0530

    [HUDI-6371] Indexing catchup tasks should handle failed commits based on 
heartbeat (#12369)
---
 .../action/index/AbstractIndexingCatchupTask.java  |  53 +++++++--
 .../action/index/IndexingCatchupTaskFactory.java   |  16 ++-
 .../index/RecordBasedIndexingCatchupTask.java      |  44 ++------
 .../table/action/index/RunIndexActionExecutor.java |   6 +-
 .../index/WriteStatBasedIndexingCatchupTask.java   |   8 +-
 .../action/index/TestIndexingCatchupTask.java      | 119 +++++++++++++++++++--
 .../apache/hudi/utilities/TestHoodieIndexer.java   |   8 +-
 7 files changed, 190 insertions(+), 64 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
index 1c4ccc35bee..bdab87e6bb3 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.action.index;
 import org.apache.hudi.avro.model.HoodieCleanMetadata;
 import org.apache.hudi.avro.model.HoodieRestoreMetadata;
 import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -30,8 +31,10 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
 import org.apache.hudi.common.util.CleanerUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,6 +64,8 @@ public abstract class AbstractIndexingCatchupTask implements 
IndexingCatchupTask
   protected final HoodieTableMetaClient metadataMetaClient;
   protected final TransactionManager transactionManager;
   protected final HoodieEngineContext engineContext;
+  protected final HoodieTable table;
+  protected final HoodieHeartbeatClient heartbeatClient;
   protected String currentCaughtupInstant;
 
   public AbstractIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
@@ -70,7 +75,9 @@ public abstract class AbstractIndexingCatchupTask implements 
IndexingCatchupTask
                                      HoodieTableMetaClient metadataMetaClient,
                                      TransactionManager transactionManager,
                                      String currentCaughtupInstant,
-                                     HoodieEngineContext engineContext) {
+                                     HoodieEngineContext engineContext,
+                                     HoodieTable table,
+                                     HoodieHeartbeatClient heartbeatClient) {
     this.metadataWriter = metadataWriter;
     this.instantsToIndex = instantsToIndex;
     this.metadataCompletedInstants = metadataCompletedInstants;
@@ -79,14 +86,15 @@ public abstract class AbstractIndexingCatchupTask 
implements IndexingCatchupTask
     this.transactionManager = transactionManager;
     this.currentCaughtupInstant = currentCaughtupInstant;
     this.engineContext = engineContext;
+    this.table = table;
+    this.heartbeatClient = heartbeatClient;
   }
 
   @Override
   public void run() {
     for (HoodieInstant instant : instantsToIndex) {
-      // metadata index already updated for this instant
-      instant = awaitInstantCaughtUp(instant);
-      if (instant == null) {
+      // Already caught up to this instant, or no heartbeat, or heartbeat 
expired for this instant
+      if (awaitInstantCaughtUp(instant)) {
         continue;
       }
       // if instant completed, ensure that there was metadata commit, else 
update metadata for this completed instant
@@ -145,16 +153,45 @@ public abstract class AbstractIndexingCatchupTask 
implements IndexingCatchupTask
   /**
    * For the given instant, this method checks if it is already caught up or 
not.
    * If not, it waits until the instant is completed.
+   * <p>
+   * 1. single writer.
+   * a. pending ingestion commit: If no heartbeat, then we are good to ignore.
+   * b. pending table service commit: There won't be any heartbeat. If no 
heartbeat, then we are good to ignore (strictly assuming single writer and 
inline table service).
+   * <p>
+   * 2. streamer + async table service.
+   * a. pending ingestion commit: If no heartbeat, then we are good to ignore.
+   * b. pending table service commit: There won't be any heartbeat. If no 
heartbeat, then we are good to ignore because we assume that user stops the 
main writer to create the index.
+   * <p>
+   * 3. Multi-writer scenarios:
+   * a. Spark datasource ingestion (OR streamer all inline) going on. User is 
trying to build index via spark-sql concurrently (w/o stopping the main writer)
+   * b. deltastreamer + async table services ongoing. User concurrently builds 
the index via spark-sql.
+   * c. multi-writer spark-ds writers. User is trying to build index via 
spark-sql concurrently (w/o stopping the all other writer)
+   * For new indexes added in 1.0.0, these flows are experimental. TODO: 
HUDI-8607.
    *
    * @param instant HoodieInstant to check
-   * @return null if instant is already caught up, else the instant after it 
is completed.
+   * @return True if instant is already caught up, or no heartbeat, or expired 
heartbeat. If heartbeat exists and not expired, then return false.
    */
-  HoodieInstant awaitInstantCaughtUp(HoodieInstant instant) {
+  boolean awaitInstantCaughtUp(HoodieInstant instant) {
     if (!metadataCompletedInstants.isEmpty() && 
metadataCompletedInstants.contains(instant.requestedTime())) {
       currentCaughtupInstant = instant.requestedTime();
-      return null;
+      return true;
     }
     if (!instant.isCompleted()) {
+      // check heartbeat
+      try {
+        // if no heartbeat, then ignore this instant
+        if (!HoodieHeartbeatClient.heartbeatExists(metaClient.getStorage(), 
metaClient.getBasePath().toString(), instant.requestedTime())) {
+          LOG.info("Ignoring instant " + instant + " as no heartbeat found");
+          return true;
+        }
+        // if heartbeat exists, but expired, then ignore this instant
+        if (table.getConfig().getFailedWritesCleanPolicy().isLazy() && 
heartbeatClient.isHeartbeatExpired(instant.requestedTime())) {
+          LOG.info("Ignoring instant " + instant + " as heartbeat expired");
+          return true;
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException("Unable to check if heartbeat expired for 
instant " + instant, e);
+      }
       try {
         LOG.warn("instant not completed, reloading timeline " + instant);
         reloadTimelineWithWait(instant);
@@ -162,7 +199,7 @@ public abstract class AbstractIndexingCatchupTask 
implements IndexingCatchupTask
         throw new HoodieIndexException(String.format("Thread interrupted while 
running indexing check for instant: %s", instant), e);
       }
     }
-    return instant;
+    return false;
   }
 
   private void reloadTimelineWithWait(HoodieInstant instant) throws 
InterruptedException {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
index 173ab5ba000..324356e34fc 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
@@ -20,12 +20,14 @@
 package org.apache.hudi.table.action.index;
 
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
 
 import java.util.List;
 import java.util.Set;
@@ -36,11 +38,13 @@ public class IndexingCatchupTaskFactory {
                                                       
HoodieTableMetadataWriter metadataWriter,
                                                       List<HoodieInstant> 
instantsToIndex,
                                                       Set<String> 
metadataCompletedInstants,
-                                                      HoodieTableMetaClient 
metaClient,
+                                                      HoodieTable table,
                                                       HoodieTableMetaClient 
metadataMetaClient,
                                                       String 
currentCaughtupInstant,
                                                       TransactionManager 
transactionManager,
-                                                      HoodieEngineContext 
engineContext) {
+                                                      HoodieEngineContext 
engineContext,
+                                                      HoodieHeartbeatClient 
heartbeatClient) {
+    HoodieTableMetaClient metaClient = table.getMetaClient();
     boolean hasRecordLevelIndexing = indexPartitionInfos.stream()
         .anyMatch(partitionInfo -> 
partitionInfo.getMetadataPartitionPath().equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
     if (hasRecordLevelIndexing) {
@@ -52,7 +56,9 @@ public class IndexingCatchupTaskFactory {
           metadataMetaClient,
           currentCaughtupInstant,
           transactionManager,
-          engineContext);
+          engineContext,
+          table,
+          heartbeatClient);
     } else {
       return new WriteStatBasedIndexingCatchupTask(
           metadataWriter,
@@ -62,7 +68,9 @@ public class IndexingCatchupTaskFactory {
           metadataMetaClient,
           currentCaughtupInstant,
           transactionManager,
-          engineContext);
+          engineContext,
+          table,
+          heartbeatClient);
     }
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
index da91af0c770..cd71e71d3a8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
@@ -19,24 +19,16 @@
 
 package org.apache.hudi.table.action.index;
 
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.FileSlice;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 
@@ -52,38 +44,16 @@ public class RecordBasedIndexingCatchupTask extends 
AbstractIndexingCatchupTask
                                         HoodieTableMetaClient 
metadataMetaClient,
                                         String currentCaughtupInstant,
                                         TransactionManager transactionManager,
-                                        HoodieEngineContext engineContext) {
-    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext);
+                                        HoodieEngineContext engineContext,
+                                        HoodieTable table,
+                                        HoodieHeartbeatClient heartbeatClient) 
{
+    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext, table, heartbeatClient);
   }
 
   @Override
   public void updateIndexForWriteAction(HoodieInstant instant) throws 
IOException {
     HoodieCommitMetadata commitMetadata = 
metaClient.getCommitMetadataSerDe().deserialize(instant,
         metaClient.getActiveTimeline().getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-    HoodieData<HoodieRecord> records = readRecordKeysFromFileSlices(instant);
-    metadataWriter.update(commitMetadata, records, instant.requestedTime());
-  }
-
-  private HoodieData<HoodieRecord> readRecordKeysFromFileSlices(HoodieInstant 
instant) throws IOException {
-    HoodieMetadataConfig metadataConfig = 
HoodieMetadataConfig.newBuilder().enable(true).build();
-    HoodieTableMetadata metadata = HoodieTableMetadata.create(
-        engineContext, metaClient.getStorage(), metadataConfig, 
metaClient.getBasePath().toString(), false);
-    HoodieMetadataFileSystemView fsView = new 
HoodieMetadataFileSystemView(metaClient, 
metaClient.getActiveTimeline().filter(i -> i.equals(instant)), metadata);
-    // Collect the list of latest file slices present in each partition
-    List<String> partitions = metadata.getAllPartitionPaths();
-    fsView.loadAllPartitions();
-    final List<Pair<String, FileSlice>> partitionFileSlicePairs = new 
ArrayList<>();
-    for (String partition : partitions) {
-      fsView.getLatestFileSlices(partition).forEach(fs -> 
partitionFileSlicePairs.add(Pair.of(partition, fs)));
-    }
-
-    return HoodieTableMetadataUtil.readRecordKeysFromFileSlices(
-        engineContext,
-        partitionFileSlicePairs,
-        false,
-        metadataConfig.getRecordIndexMaxParallelism(),
-        this.getClass().getSimpleName(),
-        metaClient,
-        EngineType.SPARK);
+    metadataWriter.update(commitMetadata, instant.requestedTime());
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 8c46c498aa2..370f019cc85 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.action.index;
 import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
 import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
 import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -279,10 +280,12 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
   private void catchupWithInflightWriters(HoodieTableMetadataWriter 
metadataWriter, List<HoodieInstant> instantsToIndex,
                                           HoodieTableMetaClient 
metadataMetaClient, Set<String> metadataCompletedTimestamps,
                                           List<HoodieIndexPartitionInfo> 
indexPartitionInfos) {
+    HoodieHeartbeatClient heartbeatClient = new 
HoodieHeartbeatClient(table.getStorage(), 
table.getMetaClient().getBasePath().toString(),
+        table.getConfig().getHoodieClientHeartbeatIntervalInMs(), 
table.getConfig().getHoodieClientHeartbeatTolerableMisses());
     ExecutorService executorService = 
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
     Future<?> indexingCatchupTaskFuture = executorService.submit(
         IndexingCatchupTaskFactory.createCatchupTask(indexPartitionInfos, 
metadataWriter, instantsToIndex, metadataCompletedTimestamps,
-            table.getMetaClient(), metadataMetaClient, currentCaughtupInstant, 
txnManager, context));
+            table, metadataMetaClient, currentCaughtupInstant, txnManager, 
context, heartbeatClient));
     try {
       LOG.info("Starting index catchup task");
       HoodieTimer timer = HoodieTimer.start();
@@ -293,6 +296,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends 
BaseActionExecutor<T, I,
       throw new HoodieIndexException(String.format("Index catchup failed. 
Current indexed instant = %s. Aborting!", currentCaughtupInstant), e);
     } finally {
       executorService.shutdownNow();
+      heartbeatClient.close();
     }
   }
 
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
index 25230049a76..6d475b76b04 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
@@ -19,12 +19,14 @@
 
 package org.apache.hudi.table.action.index;
 
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
 
 import java.io.IOException;
 import java.util.List;
@@ -42,8 +44,10 @@ public class WriteStatBasedIndexingCatchupTask extends 
AbstractIndexingCatchupTa
                                            HoodieTableMetaClient 
metadataMetaClient,
                                            String currentCaughtupInstant,
                                            TransactionManager txnManager,
-                                           HoodieEngineContext engineContext) {
-    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, txnManager, currentCaughtupInstant, 
engineContext);
+                                           HoodieEngineContext engineContext,
+                                           HoodieTable table,
+                                           HoodieHeartbeatClient 
heartbeatClient) {
+    super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, txnManager, currentCaughtupInstant, 
engineContext, table, heartbeatClient);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
index 6fa0dcc0802..ca34060694d 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
@@ -19,20 +19,28 @@
 
 package org.apache.hudi.table.action.index;
 
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
 import org.apache.hudi.client.transaction.TransactionManager;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieIndexException;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
 
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -42,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -59,6 +68,10 @@ public class TestIndexingCatchupTask {
   private TransactionManager transactionManager;
   @Mock
   private HoodieEngineContext engineContext;
+  @Mock
+  private HoodieTable table;
+  @Mock
+  private HoodieHeartbeatClient heartbeatClient;
 
   @BeforeEach
   public void setup() {
@@ -69,9 +82,16 @@ public class TestIndexingCatchupTask {
    * Mock out the behavior of the method to mimic a regular successful run
    */
   @Test
-  public void testTaskSuccessful() {
+  public void testTaskSuccessful() throws IOException {
     List<HoodieInstant> instants = 
Collections.singletonList(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
 "commit", "001"));
     Set<String> metadataCompletedInstants = new HashSet<>();
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/some/path")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .build();
+    // Simulate lazy clean policy and heartbeat expired
+    when(table.getConfig()).thenReturn(writeConfig);
+    when(heartbeatClient.isHeartbeatExpired("002")).thenReturn(false);
     AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
         metadataWriter,
         instants,
@@ -80,7 +100,9 @@ public class TestIndexingCatchupTask {
         metadataMetaClient,
         transactionManager,
         "001",
-        engineContext);
+        engineContext,
+        table,
+        heartbeatClient);
 
     task.run();
     assertEquals("001", task.currentCaughtupInstant);
@@ -90,7 +112,7 @@ public class TestIndexingCatchupTask {
    * Instant never gets completed, and we interrupt the task to see if it 
throws the expected HoodieIndexException.
    */
   @Test
-  public void testTaskInterrupted() {
+  public void testTaskInterrupted() throws IOException {
     HoodieInstant neverCompletedInstant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", 
"001");
     HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
     HoodieActiveTimeline filteredTimeline = mock(HoodieActiveTimeline.class);
@@ -107,6 +129,18 @@ public class TestIndexingCatchupTask {
       return Option.empty();
     });
 
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/some/path")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .build();
+    // Simulate heartbeat exists and not expired
+    when(table.getConfig()).thenReturn(writeConfig);
+    HoodieStorage storage = mock(HoodieStorage.class);
+    when(metaClient.getStorage()).thenReturn(storage);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+    when(storage.exists(any())).thenReturn(true);
+    when(heartbeatClient.isHeartbeatExpired("001")).thenReturn(false);
+
     AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
         metadataWriter,
         Collections.singletonList(neverCompletedInstant),
@@ -115,7 +149,9 @@ public class TestIndexingCatchupTask {
         metadataMetaClient,
         transactionManager,
         "001",
-        engineContext);
+        engineContext,
+        table,
+        heartbeatClient);
 
     // simulate catchup task timeout
     CountDownLatch latch = new CountDownLatch(1);
@@ -135,6 +171,75 @@ public class TestIndexingCatchupTask {
     }
   }
 
+  /**
+   * Test case to cover heartbeat expiry. Validate that awaitInstantCaughtUp
+   * returns null when heartbeat has expired for the given instant.
+   */
+  @Test
+  public void testHeartbeatExpired() throws IOException {
+    HoodieInstant expiredInstant = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", 
"002");
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/some/path")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .build();
+    // Simulate heartbeat exists and expired
+    when(table.getConfig()).thenReturn(writeConfig);
+    HoodieStorage storage = mock(HoodieStorage.class);
+    when(metaClient.getStorage()).thenReturn(storage);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+    when(storage.exists(any())).thenReturn(true);
+    when(heartbeatClient.isHeartbeatExpired("002")).thenReturn(true);
+
+    AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
+        metadataWriter,
+        Collections.singletonList(expiredInstant),
+        new HashSet<>(),
+        metaClient,
+        metadataMetaClient,
+        transactionManager,
+        "001",
+        engineContext,
+        table,
+        heartbeatClient
+    );
+
+    assertTrue(task.awaitInstantCaughtUp(expiredInstant), "Expected null as 
the instant's heartbeat has expired.");
+  }
+
+  /**
+   * Test case to cover the scenario where the heartbeat does not exist for 
the given instant.
+   * Validate that awaitInstantCaughtUp returns true when heartbeat does not 
exist for the given instant.
+   */
+  @Test
+  public void testNoHeartbeat() throws IOException {
+    HoodieInstant pendingInstantWithNoHeartbeat = 
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit", 
"002");
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/some/path")
+        
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+        .build();
+    // Simulate heartbeat exists and expired
+    when(table.getConfig()).thenReturn(writeConfig);
+    HoodieStorage storage = mock(HoodieStorage.class);
+    when(metaClient.getStorage()).thenReturn(storage);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+    when(storage.exists(any())).thenReturn(false);
+
+    AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
+        metadataWriter,
+        Collections.singletonList(pendingInstantWithNoHeartbeat),
+        new HashSet<>(),
+        metaClient,
+        metadataMetaClient,
+        transactionManager,
+        "001",
+        engineContext,
+        table,
+        heartbeatClient
+    );
+
+    assertTrue(task.awaitInstantCaughtUp(pendingInstantWithNoHeartbeat), 
"Expected null as the instant's heartbeat has expired.");
+  }
+
   static class DummyIndexingCatchupTask extends AbstractIndexingCatchupTask {
     public DummyIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
                                     List<HoodieInstant> instantsToIndex,
@@ -143,8 +248,10 @@ public class TestIndexingCatchupTask {
                                     HoodieTableMetaClient metadataMetaClient,
                                     TransactionManager transactionManager,
                                     String currentCaughtupInstant,
-                                    HoodieEngineContext engineContext) {
-      super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext);
+                                    HoodieEngineContext engineContext,
+                                    HoodieTable table,
+                                    HoodieHeartbeatClient heartbeatClient) {
+      super(metadataWriter, instantsToIndex, metadataCompletedInstants, 
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant, 
engineContext, table, heartbeatClient);
     }
 
     @Override
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 11f6036fc22..dd103ed859d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -39,7 +39,6 @@ import 
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieMetadataException;
 import org.apache.hudi.metadata.HoodieBackedTableMetadata;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
 import org.apache.hudi.metadata.MetadataPartitionType;
@@ -291,11 +290,8 @@ public class TestHoodieIndexer extends 
SparkClientFunctionalTestHarness implemen
 
     // start the indexer and validate files index is completely built out
     HoodieIndexer indexer = new HoodieIndexer(jsc(), config);
-    // The catchup won't finish due to inflight delta commit, and this is 
expected
-    Throwable cause = assertThrows(RuntimeException.class, () ->  
indexer.start(0))
-        .getCause();
-    assertTrue(cause instanceof HoodieMetadataException);
-    assertTrue(cause.getMessage().contains("Failed to index partition"));
+    // The catchup must finish even with inflight delta commit
+    assertEquals(0, indexer.start(0));
 
     // Now, make sure that the inflight delta commit happened before the async 
indexer
     // is intact

Reply via email to