This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 28cf81b2498 [HUDI-6371] Indexing catchup tasks should handle failed
commits based on heartbeat (#12369)
28cf81b2498 is described below
commit 28cf81b24984aa4c77a441b1e35e57c8f670b85b
Author: Sagar Sumit <[email protected]>
AuthorDate: Sat Nov 30 03:39:38 2024 +0530
[HUDI-6371] Indexing catchup tasks should handle failed commits based on
heartbeat (#12369)
---
.../action/index/AbstractIndexingCatchupTask.java | 53 +++++++--
.../action/index/IndexingCatchupTaskFactory.java | 16 ++-
.../index/RecordBasedIndexingCatchupTask.java | 44 ++------
.../table/action/index/RunIndexActionExecutor.java | 6 +-
.../index/WriteStatBasedIndexingCatchupTask.java | 8 +-
.../action/index/TestIndexingCatchupTask.java | 119 +++++++++++++++++++--
.../apache/hudi/utilities/TestHoodieIndexer.java | 8 +-
7 files changed, 190 insertions(+), 64 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
index 1c4ccc35bee..bdab87e6bb3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/AbstractIndexingCatchupTask.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieRestoreMetadata;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -30,8 +31,10 @@ import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,6 +64,8 @@ public abstract class AbstractIndexingCatchupTask implements
IndexingCatchupTask
protected final HoodieTableMetaClient metadataMetaClient;
protected final TransactionManager transactionManager;
protected final HoodieEngineContext engineContext;
+ protected final HoodieTable table;
+ protected final HoodieHeartbeatClient heartbeatClient;
protected String currentCaughtupInstant;
public AbstractIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
@@ -70,7 +75,9 @@ public abstract class AbstractIndexingCatchupTask implements
IndexingCatchupTask
HoodieTableMetaClient metadataMetaClient,
TransactionManager transactionManager,
String currentCaughtupInstant,
- HoodieEngineContext engineContext) {
+ HoodieEngineContext engineContext,
+ HoodieTable table,
+ HoodieHeartbeatClient heartbeatClient) {
this.metadataWriter = metadataWriter;
this.instantsToIndex = instantsToIndex;
this.metadataCompletedInstants = metadataCompletedInstants;
@@ -79,14 +86,15 @@ public abstract class AbstractIndexingCatchupTask
implements IndexingCatchupTask
this.transactionManager = transactionManager;
this.currentCaughtupInstant = currentCaughtupInstant;
this.engineContext = engineContext;
+ this.table = table;
+ this.heartbeatClient = heartbeatClient;
}
@Override
public void run() {
for (HoodieInstant instant : instantsToIndex) {
- // metadata index already updated for this instant
- instant = awaitInstantCaughtUp(instant);
- if (instant == null) {
+ // Already caught up to this instant, or no heartbeat, or heartbeat
expired for this instant
+ if (awaitInstantCaughtUp(instant)) {
continue;
}
// if instant completed, ensure that there was metadata commit, else
update metadata for this completed instant
@@ -145,16 +153,45 @@ public abstract class AbstractIndexingCatchupTask
implements IndexingCatchupTask
/**
* For the given instant, this method checks if it is already caught up or
not.
* If not, it waits until the instant is completed.
+ * <p>
+ * 1. single writer.
+ * a. pending ingestion commit: If no heartbeat, then we are good to ignore.
+ * b. pending table service commit: There won't be any heartbeat. If no
heartbeat, then we are good to ignore (strictly assuming single writer and
inline table service).
+ * <p>
+ * 2. streamer + async table service.
+ * a. pending ingestion commit: If no heartbeat, then we are good to ignore.
+ * b. pending table service commit: There won't be any heartbeat. If no
heartbeat, then we are good to ignore because we assume that user stops the
main writer to create the index.
+ * <p>
+ * 3. Multi-writer scenarios:
+ * a. Spark datasource ingestion (OR streamer all inline) going on. User is
trying to build index via spark-sql concurrently (w/o stopping the main writer)
+ * b. deltastreamer + async table services ongoing. User concurrently builds
the index via spark-sql.
+ * c. multi-writer spark-ds writers. User is trying to build index via
spark-sql concurrently (w/o stopping the all other writer)
+ * For new indexes added in 1.0.0, these flows are experimental. TODO:
HUDI-8607.
*
* @param instant HoodieInstant to check
- * @return null if instant is already caught up, else the instant after it
is completed.
+ * @return True if instant is already caught up, or no heartbeat, or expired
heartbeat. If heartbeat exists and not expired, then return false.
*/
- HoodieInstant awaitInstantCaughtUp(HoodieInstant instant) {
+ boolean awaitInstantCaughtUp(HoodieInstant instant) {
if (!metadataCompletedInstants.isEmpty() &&
metadataCompletedInstants.contains(instant.requestedTime())) {
currentCaughtupInstant = instant.requestedTime();
- return null;
+ return true;
}
if (!instant.isCompleted()) {
+ // check heartbeat
+ try {
+ // if no heartbeat, then ignore this instant
+ if (!HoodieHeartbeatClient.heartbeatExists(metaClient.getStorage(),
metaClient.getBasePath().toString(), instant.requestedTime())) {
+ LOG.info("Ignoring instant " + instant + " as no heartbeat found");
+ return true;
+ }
+ // if heartbeat exists, but expired, then ignore this instant
+ if (table.getConfig().getFailedWritesCleanPolicy().isLazy() &&
heartbeatClient.isHeartbeatExpired(instant.requestedTime())) {
+ LOG.info("Ignoring instant " + instant + " as heartbeat expired");
+ return true;
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Unable to check if heartbeat expired for
instant " + instant, e);
+ }
try {
LOG.warn("instant not completed, reloading timeline " + instant);
reloadTimelineWithWait(instant);
@@ -162,7 +199,7 @@ public abstract class AbstractIndexingCatchupTask
implements IndexingCatchupTask
throw new HoodieIndexException(String.format("Thread interrupted while
running indexing check for instant: %s", instant), e);
}
}
- return instant;
+ return false;
}
private void reloadTimelineWithWait(HoodieInstant instant) throws
InterruptedException {
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
index 173ab5ba000..324356e34fc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/IndexingCatchupTaskFactory.java
@@ -20,12 +20,14 @@
package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
import org.apache.hudi.metadata.MetadataPartitionType;
+import org.apache.hudi.table.HoodieTable;
import java.util.List;
import java.util.Set;
@@ -36,11 +38,13 @@ public class IndexingCatchupTaskFactory {
HoodieTableMetadataWriter metadataWriter,
List<HoodieInstant>
instantsToIndex,
Set<String>
metadataCompletedInstants,
- HoodieTableMetaClient
metaClient,
+ HoodieTable table,
HoodieTableMetaClient
metadataMetaClient,
String
currentCaughtupInstant,
TransactionManager
transactionManager,
- HoodieEngineContext
engineContext) {
+ HoodieEngineContext
engineContext,
+ HoodieHeartbeatClient
heartbeatClient) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
boolean hasRecordLevelIndexing = indexPartitionInfos.stream()
.anyMatch(partitionInfo ->
partitionInfo.getMetadataPartitionPath().equals(MetadataPartitionType.RECORD_INDEX.getPartitionPath()));
if (hasRecordLevelIndexing) {
@@ -52,7 +56,9 @@ public class IndexingCatchupTaskFactory {
metadataMetaClient,
currentCaughtupInstant,
transactionManager,
- engineContext);
+ engineContext,
+ table,
+ heartbeatClient);
} else {
return new WriteStatBasedIndexingCatchupTask(
metadataWriter,
@@ -62,7 +68,9 @@ public class IndexingCatchupTaskFactory {
metadataMetaClient,
currentCaughtupInstant,
transactionManager,
- engineContext);
+ engineContext,
+ table,
+ heartbeatClient);
}
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
index da91af0c770..cd71e71d3a8 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RecordBasedIndexingCatchupTask.java
@@ -19,24 +19,16 @@
package org.apache.hudi.table.action.index;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
-import org.apache.hudi.common.data.HoodieData;
-import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.metadata.HoodieMetadataFileSystemView;
-import org.apache.hudi.metadata.HoodieTableMetadata;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.List;
import java.util.Set;
@@ -52,38 +44,16 @@ public class RecordBasedIndexingCatchupTask extends
AbstractIndexingCatchupTask
HoodieTableMetaClient
metadataMetaClient,
String currentCaughtupInstant,
TransactionManager transactionManager,
- HoodieEngineContext engineContext) {
- super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant,
engineContext);
+ HoodieEngineContext engineContext,
+ HoodieTable table,
+ HoodieHeartbeatClient heartbeatClient)
{
+ super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant,
engineContext, table, heartbeatClient);
}
@Override
public void updateIndexForWriteAction(HoodieInstant instant) throws
IOException {
HoodieCommitMetadata commitMetadata =
metaClient.getCommitMetadataSerDe().deserialize(instant,
metaClient.getActiveTimeline().getInstantDetails(instant).get(),
HoodieCommitMetadata.class);
- HoodieData<HoodieRecord> records = readRecordKeysFromFileSlices(instant);
- metadataWriter.update(commitMetadata, records, instant.requestedTime());
- }
-
- private HoodieData<HoodieRecord> readRecordKeysFromFileSlices(HoodieInstant
instant) throws IOException {
- HoodieMetadataConfig metadataConfig =
HoodieMetadataConfig.newBuilder().enable(true).build();
- HoodieTableMetadata metadata = HoodieTableMetadata.create(
- engineContext, metaClient.getStorage(), metadataConfig,
metaClient.getBasePath().toString(), false);
- HoodieMetadataFileSystemView fsView = new
HoodieMetadataFileSystemView(metaClient,
metaClient.getActiveTimeline().filter(i -> i.equals(instant)), metadata);
- // Collect the list of latest file slices present in each partition
- List<String> partitions = metadata.getAllPartitionPaths();
- fsView.loadAllPartitions();
- final List<Pair<String, FileSlice>> partitionFileSlicePairs = new
ArrayList<>();
- for (String partition : partitions) {
- fsView.getLatestFileSlices(partition).forEach(fs ->
partitionFileSlicePairs.add(Pair.of(partition, fs)));
- }
-
- return HoodieTableMetadataUtil.readRecordKeysFromFileSlices(
- engineContext,
- partitionFileSlicePairs,
- false,
- metadataConfig.getRecordIndexMaxParallelism(),
- this.getClass().getSimpleName(),
- metaClient,
- EngineType.SPARK);
+ metadataWriter.update(commitMetadata, instant.requestedTime());
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
index 8c46c498aa2..370f019cc85 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/RunIndexActionExecutor.java
@@ -22,6 +22,7 @@ package org.apache.hudi.table.action.index;
import org.apache.hudi.avro.model.HoodieIndexCommitMetadata;
import org.apache.hudi.avro.model.HoodieIndexPartitionInfo;
import org.apache.hudi.avro.model.HoodieIndexPlan;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -279,10 +280,12 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
private void catchupWithInflightWriters(HoodieTableMetadataWriter
metadataWriter, List<HoodieInstant> instantsToIndex,
HoodieTableMetaClient
metadataMetaClient, Set<String> metadataCompletedTimestamps,
List<HoodieIndexPartitionInfo>
indexPartitionInfos) {
+ HoodieHeartbeatClient heartbeatClient = new
HoodieHeartbeatClient(table.getStorage(),
table.getMetaClient().getBasePath().toString(),
+ table.getConfig().getHoodieClientHeartbeatIntervalInMs(),
table.getConfig().getHoodieClientHeartbeatTolerableMisses());
ExecutorService executorService =
Executors.newFixedThreadPool(MAX_CONCURRENT_INDEXING);
Future<?> indexingCatchupTaskFuture = executorService.submit(
IndexingCatchupTaskFactory.createCatchupTask(indexPartitionInfos,
metadataWriter, instantsToIndex, metadataCompletedTimestamps,
- table.getMetaClient(), metadataMetaClient, currentCaughtupInstant,
txnManager, context));
+ table, metadataMetaClient, currentCaughtupInstant, txnManager,
context, heartbeatClient));
try {
LOG.info("Starting index catchup task");
HoodieTimer timer = HoodieTimer.start();
@@ -293,6 +296,7 @@ public class RunIndexActionExecutor<T, I, K, O> extends
BaseActionExecutor<T, I,
throw new HoodieIndexException(String.format("Index catchup failed.
Current indexed instant = %s. Aborting!", currentCaughtupInstant), e);
} finally {
executorService.shutdownNow();
+ heartbeatClient.close();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
index 25230049a76..6d475b76b04 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/index/WriteStatBasedIndexingCatchupTask.java
@@ -19,12 +19,14 @@
package org.apache.hudi.table.action.index;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.table.HoodieTable;
import java.io.IOException;
import java.util.List;
@@ -42,8 +44,10 @@ public class WriteStatBasedIndexingCatchupTask extends
AbstractIndexingCatchupTa
HoodieTableMetaClient
metadataMetaClient,
String currentCaughtupInstant,
TransactionManager txnManager,
- HoodieEngineContext engineContext) {
- super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, txnManager, currentCaughtupInstant,
engineContext);
+ HoodieEngineContext engineContext,
+ HoodieTable table,
+ HoodieHeartbeatClient
heartbeatClient) {
+ super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, txnManager, currentCaughtupInstant,
engineContext, table, heartbeatClient);
}
@Override
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
index 6fa0dcc0802..ca34060694d 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/table/action/index/TestIndexingCatchupTask.java
@@ -19,20 +19,28 @@
package org.apache.hudi.table.action.index;
+import org.apache.hudi.client.heartbeat.HoodieHeartbeatClient;
import org.apache.hudi.client.transaction.TransactionManager;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCleanConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.storage.HoodieStorage;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
+import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
@@ -42,6 +50,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
@@ -59,6 +68,10 @@ public class TestIndexingCatchupTask {
private TransactionManager transactionManager;
@Mock
private HoodieEngineContext engineContext;
+ @Mock
+ private HoodieTable table;
+ @Mock
+ private HoodieHeartbeatClient heartbeatClient;
@BeforeEach
public void setup() {
@@ -69,9 +82,16 @@ public class TestIndexingCatchupTask {
* Mock out the behavior of the method to mimic a regular successful run
*/
@Test
- public void testTaskSuccessful() {
+ public void testTaskSuccessful() throws IOException {
List<HoodieInstant> instants =
Collections.singletonList(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED,
"commit", "001"));
Set<String> metadataCompletedInstants = new HashSet<>();
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/some/path")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .build();
+ // Simulate lazy clean policy and heartbeat expired
+ when(table.getConfig()).thenReturn(writeConfig);
+ when(heartbeatClient.isHeartbeatExpired("002")).thenReturn(false);
AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
metadataWriter,
instants,
@@ -80,7 +100,9 @@ public class TestIndexingCatchupTask {
metadataMetaClient,
transactionManager,
"001",
- engineContext);
+ engineContext,
+ table,
+ heartbeatClient);
task.run();
assertEquals("001", task.currentCaughtupInstant);
@@ -90,7 +112,7 @@ public class TestIndexingCatchupTask {
* Instant never gets completed, and we interrupt the task to see if it
throws the expected HoodieIndexException.
*/
@Test
- public void testTaskInterrupted() {
+ public void testTaskInterrupted() throws IOException {
HoodieInstant neverCompletedInstant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit",
"001");
HoodieActiveTimeline activeTimeline = mock(HoodieActiveTimeline.class);
HoodieActiveTimeline filteredTimeline = mock(HoodieActiveTimeline.class);
@@ -107,6 +129,18 @@ public class TestIndexingCatchupTask {
return Option.empty();
});
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/some/path")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .build();
+ // Simulate heartbeat exists and not expired
+ when(table.getConfig()).thenReturn(writeConfig);
+ HoodieStorage storage = mock(HoodieStorage.class);
+ when(metaClient.getStorage()).thenReturn(storage);
+ when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+ when(storage.exists(any())).thenReturn(true);
+ when(heartbeatClient.isHeartbeatExpired("001")).thenReturn(false);
+
AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
metadataWriter,
Collections.singletonList(neverCompletedInstant),
@@ -115,7 +149,9 @@ public class TestIndexingCatchupTask {
metadataMetaClient,
transactionManager,
"001",
- engineContext);
+ engineContext,
+ table,
+ heartbeatClient);
// simulate catchup task timeout
CountDownLatch latch = new CountDownLatch(1);
@@ -135,6 +171,75 @@ public class TestIndexingCatchupTask {
}
}
+ /**
+ * Test case to cover heartbeat expiry. Validate that awaitInstantCaughtUp
+ * returns null when heartbeat has expired for the given instant.
+ */
+ @Test
+ public void testHeartbeatExpired() throws IOException {
+ HoodieInstant expiredInstant =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit",
"002");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/some/path")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .build();
+ // Simulate heartbeat exists and expired
+ when(table.getConfig()).thenReturn(writeConfig);
+ HoodieStorage storage = mock(HoodieStorage.class);
+ when(metaClient.getStorage()).thenReturn(storage);
+ when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+ when(storage.exists(any())).thenReturn(true);
+ when(heartbeatClient.isHeartbeatExpired("002")).thenReturn(true);
+
+ AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
+ metadataWriter,
+ Collections.singletonList(expiredInstant),
+ new HashSet<>(),
+ metaClient,
+ metadataMetaClient,
+ transactionManager,
+ "001",
+ engineContext,
+ table,
+ heartbeatClient
+ );
+
+ assertTrue(task.awaitInstantCaughtUp(expiredInstant), "Expected null as
the instant's heartbeat has expired.");
+ }
+
+ /**
+ * Test case to cover the scenario where the heartbeat does not exist for
the given instant.
+ * Validate that awaitInstantCaughtUp returns true when heartbeat does not
exist for the given instant.
+ */
+ @Test
+ public void testNoHeartbeat() throws IOException {
+ HoodieInstant pendingInstantWithNoHeartbeat =
INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.REQUESTED, "commit",
"002");
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+ .withPath("/some/path")
+
.withCleanConfig(HoodieCleanConfig.newBuilder().withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY).build())
+ .build();
+ // Simulate heartbeat exists and expired
+ when(table.getConfig()).thenReturn(writeConfig);
+ HoodieStorage storage = mock(HoodieStorage.class);
+ when(metaClient.getStorage()).thenReturn(storage);
+ when(metaClient.getBasePath()).thenReturn(new StoragePath("/some/path"));
+ when(storage.exists(any())).thenReturn(false);
+
+ AbstractIndexingCatchupTask task = new DummyIndexingCatchupTask(
+ metadataWriter,
+ Collections.singletonList(pendingInstantWithNoHeartbeat),
+ new HashSet<>(),
+ metaClient,
+ metadataMetaClient,
+ transactionManager,
+ "001",
+ engineContext,
+ table,
+ heartbeatClient
+ );
+
+ assertTrue(task.awaitInstantCaughtUp(pendingInstantWithNoHeartbeat),
"Expected null as the instant's heartbeat has expired.");
+ }
+
static class DummyIndexingCatchupTask extends AbstractIndexingCatchupTask {
public DummyIndexingCatchupTask(HoodieTableMetadataWriter metadataWriter,
List<HoodieInstant> instantsToIndex,
@@ -143,8 +248,10 @@ public class TestIndexingCatchupTask {
HoodieTableMetaClient metadataMetaClient,
TransactionManager transactionManager,
String currentCaughtupInstant,
- HoodieEngineContext engineContext) {
- super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant,
engineContext);
+ HoodieEngineContext engineContext,
+ HoodieTable table,
+ HoodieHeartbeatClient heartbeatClient) {
+ super(metadataWriter, instantsToIndex, metadataCompletedInstants,
metaClient, metadataMetaClient, transactionManager, currentCaughtupInstant,
engineContext, table, heartbeatClient);
}
@Override
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
index 11f6036fc22..dd103ed859d 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieIndexer.java
@@ -39,7 +39,6 @@ import
org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.metadata.HoodieBackedTableMetadata;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
import org.apache.hudi.metadata.MetadataPartitionType;
@@ -291,11 +290,8 @@ public class TestHoodieIndexer extends
SparkClientFunctionalTestHarness implemen
// start the indexer and validate files index is completely built out
HoodieIndexer indexer = new HoodieIndexer(jsc(), config);
- // The catchup won't finish due to inflight delta commit, and this is
expected
- Throwable cause = assertThrows(RuntimeException.class, () ->
indexer.start(0))
- .getCause();
- assertTrue(cause instanceof HoodieMetadataException);
- assertTrue(cause.getMessage().contains("Failed to index partition"));
+ // The catchup must finish even with inflight delta commit
+ assertEquals(0, indexer.start(0));
// Now, make sure that the inflight delta commit happened before the async
indexer
// is intact