This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 2f073643dfe7 feat: add graceful handling for post-commit failures with
metrics (#18196)
2f073643dfe7 is described below
commit 2f073643dfe764d34fee5a139d95633df01d9292
Author: Surya Prasanna <[email protected]>
AuthorDate: Tue Mar 24 14:05:43 2026 -0700
feat: add graceful handling for post-commit failures with metrics (#18196)
Adding capability to ignore post commit failures. By gracefully handling
the post commit failures we can allow ingestion to complete without failure.
Also, added metrics through which we can track the duration and failures.
---------
Co-authored-by: suryaprasanna <[email protected]>
---
.../apache/hudi/client/BaseHoodieWriteClient.java | 41 ++++++-
.../org/apache/hudi/config/HoodieWriteConfig.java | 16 +++
.../org/apache/hudi/metrics/HoodieMetrics.java | 20 ++++
.../hudi/client/TestSparkRDDWriteClient.java | 131 +++++++++++++++++++++
4 files changed, 202 insertions(+), 6 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 2ec7476f6501..4836c83022a0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -63,6 +63,7 @@ import org.apache.hudi.common.util.CleanerUtils;
import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CommitUtils;
import org.apache.hudi.common.util.Functions;
+import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
@@ -271,7 +272,6 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
extraPreCommitFunc.get().accept(table.getMetaClient(), metadata);
}
commit(table, commitActionType, instantTime, metadata, tableWriteStats,
skipStreamingWritesToMetadataTable);
- postCommit(table, metadata, instantTime, extraMetadata);
log.info("Committed {}", instantTime);
} catch (IOException e) {
throw new HoodieCommitException("Failed to complete commit " +
config.getBasePath() + " at time " + instantTime, e);
@@ -282,8 +282,23 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
// trigger clean and archival.
// Each internal call should ensure to lock if required.
- mayBeCleanAndArchive(table);
- runTableServicesInline(table, metadata, extraMetadata);
+ boolean postCommitStatus = true;
+ HoodieTimer postCommitTimer = HoodieTimer.start();
+ try {
+ postCommit(table, metadata, instantTime, extraMetadata);
+ mayBeCleanAndArchive(table);
+ runTableServicesInline(table, metadata, extraMetadata);
+ } catch (Exception e) {
+ postCommitStatus = false;
+ if (config.canIgnorePostCommitFailures()) {
+ LOG.error("Ignoring exception during post-commit or inline table
service processing", e);
+ } else {
+ throw e;
+ }
+ } finally {
+ long duration = postCommitTimer.endTimer();
+ metrics.updatePostCommitMetrics(postCommitStatus, duration);
+ }
emitCommitMetrics(instantTime, metadata, commitActionType);
@@ -616,8 +631,22 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
result.getWriteStats().get().size());
}
- postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
- mayBeCleanAndArchive(hoodieTable);
+ boolean postCommitStatus = true;
+ HoodieTimer postCommitTimer = HoodieTimer.start();
+ try {
+ postCommit(hoodieTable, result.getCommitMetadata().get(), instantTime,
Option.empty());
+ mayBeCleanAndArchive(hoodieTable);
+ } catch (Exception e) {
+ postCommitStatus = false;
+ if (config.canIgnorePostCommitFailures()) {
+ LOG.error("Ignoring exception during post-commit or inline table
service processing", e);
+ } else {
+ throw e;
+ }
+ } finally {
+ long duration = postCommitTimer.endTimer();
+ metrics.updatePostCommitMetrics(postCommitStatus, duration);
+ }
emitCommitMetrics(instantTime, result.getCommitMetadata().get(),
hoodieTable.getMetaClient().getCommitActionType());
}
@@ -634,7 +663,7 @@ public abstract class BaseHoodieWriteClient<T, I, K, O>
extends BaseHoodieClient
*/
protected void postCommit(HoodieTable table, HoodieCommitMetadata metadata,
String instantTime, Option<Map<String, String>> extraMetadata) {
try {
- context.setJobStatus(this.getClass().getSimpleName(),"Cleaning up marker
directories for commit " + instantTime + " in table "
+ context.setJobStatus(this.getClass().getSimpleName(), "Cleaning up
marker directories for commit " + instantTime + " in table "
+ config.getTableName());
// Delete the marker directory for the instant.
WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index d7b5f3d938a9..9d32ec8b7d22 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -731,6 +731,13 @@ public class HoodieWriteConfig extends HoodieConfig {
.markAdvanced()
.withDocumentation("");
+ public static final ConfigProperty<Boolean> CAN_IGNORE_POST_COMMIT_FAILURES
= ConfigProperty
+ .key("hoodie.write.can.ignore.post.commit.failures")
+ .defaultValue(false)
+ .withAlternatives("hoodie.post.commit.failures.ignored")
+ .withDocumentation("When this config is true, any failures in
post-commit operations are"
+ + " ignored and do not kill the application.");
+
public static final ConfigProperty<String>
AVRO_EXTERNAL_SCHEMA_TRANSFORMATION_ENABLE = ConfigProperty
.key(AVRO_SCHEMA_STRING.key() + ".external.transformation")
.defaultValue("false")
@@ -2804,6 +2811,10 @@ public class HoodieWriteConfig extends HoodieConfig {
return getBoolean(BLOCK_WRITES_ON_SPECULATIVE_EXECUTION);
}
+ public boolean canIgnorePostCommitFailures() {
+ return getBoolean(CAN_IGNORE_POST_COMMIT_FAILURES);
+ }
+
/**
* Are any table services configured to run inline for both scheduling and
execution?
*
@@ -3483,6 +3494,11 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withCanIgnorePostCommitFailures(boolean
canIgnorePostCommitFailures) {
+ writeConfig.setValue(CAN_IGNORE_POST_COMMIT_FAILURES,
String.valueOf(canIgnorePostCommitFailures));
+ return this;
+ }
+
public Builder withPopulateMetaFields(boolean populateMetaFields) {
writeConfig.setValue(HoodieTableConfig.POPULATE_META_FIELDS,
Boolean.toString(populateMetaFields));
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 0decbe290c3d..c777e842bbd3 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -86,6 +86,7 @@ public class HoodieMetrics {
public static final String PENDING_COMPACTION_INSTANT_COUNT_STR =
"pendingCompactionInstantCount";
public static final String PENDING_CLEAN_INSTANT_COUNT_STR =
"pendingCleanInstantCount";
public static final String PENDING_ROLLBACK_INSTANT_COUNT_STR =
"pendingRollbackInstantCount";
+ public static final String POST_COMMIT_STR = "postCommit";
public static final String SUCCESS_EXTENSION = ".success";
public static final String FAILURE_EXTENSION = ".failure";
@@ -147,6 +148,10 @@ public class HoodieMetrics {
private Counter compactionRequestedCounter = null;
private Counter compactionCompletedCounter = null;
private Counter rollbackFailureCounter = null;
+ private Counter postCommitSuccessCounter = null;
+ private Counter postCommitFailureCounter = null;
+ private String postCommitSuccessCounterName = null;
+ private String postCommitFailureCounterName = null;
public HoodieMetrics(HoodieWriteConfig config, HoodieStorage storage) {
this.config = config;
@@ -174,6 +179,8 @@ public class HoodieMetrics {
this.compactionRequestedCounterName =
getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.REQUESTED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
this.compactionCompletedCounterName =
getMetricsName(HoodieTimeline.COMPACTION_ACTION,
HoodieTimeline.COMPLETED_COMPACTION_SUFFIX + COUNTER_METRIC_EXTENSION);
this.rollbackFailureCounterName = getMetricsName("rollback",
FAILURE_COUNTER);
+ this.postCommitSuccessCounterName = getMetricsName(POST_COMMIT_STR,
SUCCESS_COUNTER);
+ this.postCommitFailureCounterName = getMetricsName(POST_COMMIT_STR,
FAILURE_COUNTER);
}
}
@@ -358,6 +365,19 @@ public class HoodieMetrics {
}
}
+ public void updatePostCommitMetrics(boolean status, long durationInMs) {
+ if (config.isMetricsOn()) {
+ if (status) {
+ postCommitSuccessCounter = getCounter(postCommitSuccessCounter,
postCommitSuccessCounterName);
+ postCommitSuccessCounter.inc();
+ } else {
+ postCommitFailureCounter = getCounter(postCommitFailureCounter,
postCommitFailureCounterName);
+ postCommitFailureCounter.inc();
+ }
+ metrics.registerGauge(getMetricsName(POST_COMMIT_STR, DURATION_STR),
durationInMs);
+ }
+ }
+
public void updateCleanMetrics(long durationInMs, int numFilesDeleted) {
if (config.isMetricsOn()) {
log.info(
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
index 84c8eaa9eff9..7619e70ed126 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -19,6 +19,7 @@
package org.apache.hudi.client;
+import org.apache.hudi.client.common.HoodieSparkEngineContext;
import org.apache.hudi.client.embedded.EmbeddedTimelineService;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
@@ -34,8 +35,13 @@ import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.data.HoodieJavaRDD;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.metrics.Metrics;
+import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Gauge;
+import com.codahale.metrics.MetricRegistry;
import org.apache.avro.generic.GenericRecord;
import org.apache.spark.api.java.JavaRDD;
import org.junit.jupiter.api.Test;
@@ -53,9 +59,13 @@ import java.util.stream.Collectors;
import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static org.apache.hudi.metrics.HoodieMetrics.DURATION_STR;
+import static org.apache.hudi.metrics.HoodieMetrics.FAILURE_COUNTER;
+import static org.apache.hudi.metrics.HoodieMetrics.POST_COMMIT_STR;
import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -311,4 +321,125 @@ class TestSparkRDDWriteClient extends
SparkClientFunctionalTestHarness {
// Reset speculation config after test
jsc().sc().conf().set("spark.speculation", "false");
}
+
+ @Test
+ public void testPostCommitFailureHandlingWithMetrics() throws IOException {
+ HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(),
URI.create(basePath()).getPath(), new Properties());
+
+ // Create a custom write client that throws exception during post commit
operations
+ class PostCommitFailingWriteClient extends SparkRDDWriteClient {
+ private boolean shouldFailPostCommit = false;
+
+ public PostCommitFailingWriteClient(HoodieSparkEngineContext context,
HoodieWriteConfig writeConfig) {
+ super(context, writeConfig);
+ }
+
+ public void setShouldFailPostCommit(boolean shouldFail) {
+ this.shouldFailPostCommit = shouldFail;
+ }
+
+ @Override
+ protected void mayBeCleanAndArchive(HoodieTable table) {
+ if (shouldFailPostCommit) {
+ throw new RuntimeException("Simulated post commit failure for
testing");
+ }
+ super.mayBeCleanAndArchive(table);
+ }
+ }
+
+ // Test with post commit failures ignored
+ HoodieWriteConfig configWithIgnore = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withCanIgnorePostCommitFailures(true)
+
.withMetricsConfig(org.apache.hudi.config.metrics.HoodieMetricsConfig.newBuilder()
+ .on(true)
+ .withReporterType("INMEMORY")
+ .build())
+ .build();
+
+ HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();
+ String instant1 = getCommitTimeAtUTC(1);
+
+ PostCommitFailingWriteClient clientWithIgnore = new
PostCommitFailingWriteClient(context(), configWithIgnore);
+ clientWithIgnore.setShouldFailPostCommit(true);
+
+ // Generate and write records - this should succeed even though postCommit
fails
+ List<HoodieRecord> records1 = dataGen.generateInserts(instant1, 10);
+ JavaRDD<HoodieRecord> writeRecords1 = jsc().parallelize(records1, 2);
+
+ WriteClientTestUtils.startCommitWithTime(clientWithIgnore, instant1);
+ List<WriteStatus> writeStatuses1 = clientWithIgnore.insert(writeRecords1,
instant1).collect();
+ assertNoWriteErrors(writeStatuses1);
+
+ // The commit should succeed despite postCommit failure because we have
ignore flag enabled
+ clientWithIgnore.commitStats(instant1,
writeStatuses1.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+
+ // Verify metrics were updated for failure
+ Metrics metrics = Metrics.getInstance(configWithIgnore.getMetricsConfig(),
hoodieStorage());
+ MetricRegistry registry = metrics.getRegistry();
+
+ // Build metric names correctly using HoodieMetrics.getMetricsName pattern
+ String failureMetricName =
clientWithIgnore.getMetrics().getMetricsName(POST_COMMIT_STR, FAILURE_COUNTER);
+ String durationMetricName =
clientWithIgnore.getMetrics().getMetricsName(POST_COMMIT_STR, DURATION_STR);
+
+ Counter failureCounter = registry.getCounters().get(failureMetricName);
+ Gauge<Long> durationGauge = (Gauge<Long>)
registry.getGauges().get(durationMetricName);
+
+ assertNotNull(failureCounter, "Failure metric should be registered");
+ assertEquals(1L, failureCounter.getCount(), "Failure count should be 1");
+ assertNotNull(durationGauge, "Duration metric should be registered");
+ assertTrue(durationGauge.getValue() >= 0, "Duration should be
non-negative");
+
+ clientWithIgnore.close();
+ metrics.shutdown();
+
+ // Test with post commit failures NOT ignored (should throw exception)
+ HoodieWriteConfig configWithoutIgnore = getConfigBuilder(true)
+ .withPath(metaClient.getBasePath())
+ .withCanIgnorePostCommitFailures(false)
+
.withMetricsConfig(org.apache.hudi.config.metrics.HoodieMetricsConfig.newBuilder()
+ .on(true)
+ .withReporterType("INMEMORY")
+ .build())
+ .build();
+
+ String instant2 = getCommitTimeAtUTC(2);
+ PostCommitFailingWriteClient clientWithoutIgnore = new
PostCommitFailingWriteClient(context(), configWithoutIgnore);
+ clientWithoutIgnore.setShouldFailPostCommit(true);
+
+ List<HoodieRecord> records2 = dataGen.generateInserts(instant2, 10);
+ JavaRDD<HoodieRecord> writeRecords2 = jsc().parallelize(records2, 2);
+
+ WriteClientTestUtils.startCommitWithTime(clientWithoutIgnore, instant2);
+ List<WriteStatus> writeStatuses2 =
clientWithoutIgnore.insert(writeRecords2, instant2).collect();
+ assertNoWriteErrors(writeStatuses2);
+
+ // This should throw RuntimeException because ignore flag is false
+ RuntimeException exception = assertThrows(RuntimeException.class, () -> {
+ clientWithoutIgnore.commitStats(instant2,
writeStatuses2.stream().map(WriteStatus::getStat).collect(Collectors.toList()),
+ Option.empty(), metaClient.getCommitActionType());
+ });
+
+ assertTrue(exception.getMessage().contains("Simulated post commit failure
for testing"),
+ "Exception message should contain the simulated failure message");
+
+ // Verify metrics were still updated even when exception was thrown
+ Metrics metrics2 =
Metrics.getInstance(configWithoutIgnore.getMetricsConfig(), hoodieStorage());
+ MetricRegistry registry2 = metrics2.getRegistry();
+
+ String failureMetricName2 =
clientWithoutIgnore.getMetrics().getMetricsName(POST_COMMIT_STR,
FAILURE_COUNTER);
+ String durationMetricName2 =
clientWithoutIgnore.getMetrics().getMetricsName(POST_COMMIT_STR, DURATION_STR);
+
+ Counter failureCounter2 = registry2.getCounters().get(failureMetricName2);
+ Gauge<Long> durationGauge2 = (Gauge<Long>)
registry2.getGauges().get(durationMetricName2);
+
+ assertNotNull(failureCounter2, "Failure metric should be registered for
second test");
+ assertEquals(1L, failureCounter2.getCount(), "Failure count should be 1");
+ assertNotNull(durationGauge2, "Duration metric should be registered for
second test");
+ assertTrue(durationGauge2.getValue() >= 0, "Duration should be
non-negative");
+
+ clientWithoutIgnore.close();
+ metrics2.shutdown();
+ }
}