This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 4a7b6230b643 feat(metadata): Handle metadata table service failures
gracefully and emit metrics (#17930)
4a7b6230b643 is described below
commit 4a7b6230b6430168b709ddd57660956ae744d5b0
Author: Surya Prasanna <[email protected]>
AuthorDate: Fri Jan 30 14:23:23 2026 -0800
feat(metadata): Handle metadata table service failures gracefully and emit
metrics (#17930)
---
.../metadata/HoodieBackedTableMetadataWriter.java | 77 ++++++++----
.../TestHoodieBackedTableMetadataWriter.java | 132 ++++++++++++++++++++-
.../hudi/common/config/HoodieMetadataConfig.java | 17 +++
.../hudi/metadata/HoodieMetadataMetrics.java | 3 +
.../common/config/TestHoodieMetadataConfig.java | 40 +++----
5 files changed, 221 insertions(+), 48 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index ec665d883e53..0d8905731062 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -2003,7 +2003,8 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
BaseHoodieWriteClient<?, I, ?, O> writeClient = getWriteClient();
try {
// Run any pending table services operations and return the active
timeline
- HoodieActiveTimeline activeTimeline =
runPendingTableServicesOperationsAndRefreshTimeline(metadataMetaClient,
writeClient, requiresTimelineRefresh);
+ HoodieActiveTimeline activeTimeline =
runPendingTableServicesOperationsAndRefreshTimeline(
+ metadataMetaClient, writeClient, requiresTimelineRefresh, metrics);
Option<HoodieInstant> lastInstant =
activeTimeline.getDeltaCommitTimeline()
.filterCompletedInstants()
@@ -2024,7 +2025,9 @@ public abstract class HoodieBackedTableMetadataWriter<I,
O> implements HoodieTab
} catch (Exception e) {
LOG.error("Exception in running table services on metadata table", e);
allTableServicesExecutedSuccessfullyOrSkipped = false;
- throw e;
+ if
(dataWriteConfig.getMetadataConfig().shouldFailOnTableServiceFailures()) {
+ throw e;
+ }
} finally {
String metadataTableName = writeClient.getConfig().getTableName();
boolean tableNameExists = StringUtils.nonEmpty(metadataTableName);
@@ -2046,19 +2049,26 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
static HoodieActiveTimeline
runPendingTableServicesOperationsAndRefreshTimeline(HoodieTableMetaClient
metadataMetaClient,
BaseHoodieWriteClient<?, ?, ?, ?> writeClient,
-
boolean initialTimelineRequiresRefresh) {
- HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ?
metadataMetaClient.reloadActiveTimeline() :
metadataMetaClient.getActiveTimeline();
- // finish off any pending log compaction or compactions operations if any
from previous attempt.
- boolean ranServices = false;
- if (activeTimeline.filterPendingCompactionTimeline().countInstants() > 0) {
- writeClient.runAnyPendingCompactions();
- ranServices = true;
- }
- if (activeTimeline.filterPendingLogCompactionTimeline().countInstants() >
0) {
- writeClient.runAnyPendingLogCompactions();
- ranServices = true;
+
boolean initialTimelineRequiresRefresh,
+
Option<HoodieMetadataMetrics> metricsOption) {
+ try {
+ HoodieActiveTimeline activeTimeline = initialTimelineRequiresRefresh ?
metadataMetaClient.reloadActiveTimeline() :
metadataMetaClient.getActiveTimeline();
+ // finish off any pending log compaction or compactions operations if
any from previous attempt.
+ boolean ranServices = false;
+ if (activeTimeline.filterPendingCompactionTimeline().countInstants() >
0) {
+ writeClient.runAnyPendingCompactions();
+ ranServices = true;
+ }
+ if (activeTimeline.filterPendingLogCompactionTimeline().countInstants()
> 0) {
+ writeClient.runAnyPendingLogCompactions();
+ ranServices = true;
+ }
+ return ranServices ? metadataMetaClient.reloadActiveTimeline() :
activeTimeline;
+ } catch (Exception e) {
+ metricsOption.ifPresent(m -> m.incrementMetric(
+ HoodieMetadataMetrics.PENDING_COMPACTIONS_FAILURES, 1));
+ throw e;
}
- return ranServices ? metadataMetaClient.reloadActiveTimeline() :
activeTimeline;
}
/**
@@ -2088,18 +2098,35 @@ public abstract class
HoodieBackedTableMetadataWriter<I, O> implements HoodieTab
// let's say we trigger compaction after C5 in MDT and so compaction
completes with C4001. but C5 crashed before completing in MDT.
// and again w/ C6, we will re-attempt compaction at which point latest
delta commit is C4 in MDT.
// and so we try compaction w/ instant C4001. So, we can avoid compaction
if we already have compaction w/ same instant time.
- if
(metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime))
{
- LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
- } else if (writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
- LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
- writeClient.compact(compactionInstantTime, true);
- } else if (metadataWriteConfig.isLogCompactionEnabled()) {
- // Schedule and execute log compaction with new instant time.
- Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
- if (scheduledLogCompaction.isPresent()) {
- LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
- writeClient.logCompact(scheduledLogCompaction.get(), true);
+ boolean skipCompactions =
metadataMetaClient.getActiveTimeline().filterCompletedInstants().containsInstant(compactionInstantTime);
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if
(writeClient.scheduleCompactionAtInstant(compactionInstantTime,
Option.empty())) {
+ LOG.info("Compaction is scheduled for timestamp {}",
compactionInstantTime);
+ writeClient.compact(compactionInstantTime, true);
}
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing compaction in metadata
table", e);
+ throw e;
+ }
+
+ try {
+ if (skipCompactions) {
+ LOG.info("Compaction with same {} time is already present in the
timeline.", compactionInstantTime);
+ } else if (metadataWriteConfig.isLogCompactionEnabled()) {
+ // Schedule and execute log compaction with new instant time.
+ Option<String> scheduledLogCompaction =
writeClient.scheduleLogCompaction(Option.empty());
+ if (scheduledLogCompaction.isPresent()) {
+ LOG.info("Log compaction is scheduled for timestamp {}",
scheduledLogCompaction.get());
+ writeClient.logCompact(scheduledLogCompaction.get(), true);
+ }
+ }
+ } catch (Exception e) {
+ metrics.ifPresent(m ->
m.incrementMetric(HoodieMetadataMetrics.LOG_COMPACTION_FAILURES, 1));
+ LOG.error("Error in scheduling and executing logcompaction in metadata
table", e);
+ throw e;
}
}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
index e583dead5ddc..5b69c1af6528 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieBackedTableMetadataWriter.java
@@ -32,28 +32,35 @@ import
org.apache.hudi.common.table.timeline.versioning.v2.ActiveTimelineV2;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCleanConfig;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.storage.StorageConfiguration;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.MethodSource;
import org.mockito.MockedStatic;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.stream.Stream;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.times;
@@ -107,7 +114,8 @@ class TestHoodieBackedTableMetadataWriter {
} else {
expectedResult = initialTimeline;
}
- assertSame(expectedResult,
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(metaClient,
writeClient, requiresRefresh));
+ assertSame(expectedResult,
HoodieBackedTableMetadataWriter.runPendingTableServicesOperationsAndRefreshTimeline(
+ metaClient, writeClient, requiresRefresh, Option.empty()));
verify(writeClient, times(hasPendingCompaction ? 1 :
0)).runAnyPendingCompactions();
verify(writeClient, times(hasPendingLogCompaction ? 1 :
0)).runAnyPendingLogCompactions();
@@ -314,4 +322,126 @@ class TestHoodieBackedTableMetadataWriter {
timeline.setInstants(instants);
return timeline;
}
+
+ static Stream<Arguments> performTableServicesFailureTestCases() {
+ return Stream.of(
+ Arguments.of(
+ "compaction",
+ true,
+ false,
+ new RuntimeException("Compaction failed"),
+ true
+ ),
+ Arguments.of(
+ "compaction",
+ true,
+ false,
+ new RuntimeException("Compaction failed"),
+ false
+ ),
+ Arguments.of(
+ "log compaction",
+ false,
+ true,
+ new HoodieException("Log compaction failed"),
+ true
+ ),
+ Arguments.of(
+ "log compaction",
+ false,
+ true,
+ new HoodieException("Log compaction failed"),
+ false
+ )
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("performTableServicesFailureTestCases")
+ void testPerformTableServicesWithFailureHandling(
+ String serviceType,
+ boolean hasPendingCompaction,
+ boolean hasPendingLogCompaction,
+ RuntimeException exceptionToThrow,
+ boolean shouldFailOnTableServiceFailures) throws Exception {
+ // Create mocks for dependencies
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieActiveTimeline timeline = mock(HoodieActiveTimeline.class,
RETURNS_DEEP_STUBS);
+ BaseHoodieWriteClient writeClient = mock(BaseHoodieWriteClient.class);
+ HoodieMetadataMetrics metrics = mock(HoodieMetadataMetrics.class);
+ HoodieWriteConfig writeConfig = mock(HoodieWriteConfig.class);
+ HoodieMetadataConfig metadataConfig = mock(HoodieMetadataConfig.class);
+
+ // Set up config mocks
+ when(writeConfig.getMetadataConfig()).thenReturn(metadataConfig);
+
when(metadataConfig.shouldFailOnTableServiceFailures()).thenReturn(shouldFailOnTableServiceFailures);
+ when(writeConfig.getTableName()).thenReturn("test_table");
+
+ // Set up timeline mocks
+ when(metaClient.reloadActiveTimeline()).thenReturn(timeline);
+ when(metaClient.getActiveTimeline()).thenReturn(timeline);
+
when(timeline.filterPendingCompactionTimeline().countInstants()).thenReturn(hasPendingCompaction
? 1 : 0);
+
when(timeline.filterPendingLogCompactionTimeline().countInstants()).thenReturn(hasPendingLogCompaction
? 1 : 0);
+
when(timeline.getDeltaCommitTimeline().filterCompletedInstants().lastInstant()).thenReturn(Option.empty());
+
+ // Set up write client mocks
+ when(writeClient.getConfig()).thenReturn(writeConfig);
+
+ // Simulate failure based on service type
+ if (hasPendingCompaction) {
+ doThrow(exceptionToThrow).when(writeClient).runAnyPendingCompactions();
+ }
+ if (hasPendingLogCompaction) {
+
doThrow(exceptionToThrow).when(writeClient).runAnyPendingLogCompactions();
+ }
+
+ // Create a partial mock of HoodieBackedTableMetadataWriter
+ HoodieBackedTableMetadataWriter writer =
mock(HoodieBackedTableMetadataWriter.class);
+
+ // Mock getWriteClient to return our mock write client
+ when(writer.getWriteClient()).thenReturn(writeClient);
+
+ // Set up the writer's fields using reflection
+ java.lang.reflect.Field metadataMetaClientField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("metadataMetaClient");
+ metadataMetaClientField.setAccessible(true);
+ metadataMetaClientField.set(writer, metaClient);
+
+ java.lang.reflect.Field writeClientField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("writeClient");
+ writeClientField.setAccessible(true);
+ writeClientField.set(writer, writeClient);
+
+ java.lang.reflect.Field writeConfigField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("dataWriteConfig");
+ writeConfigField.setAccessible(true);
+ writeConfigField.set(writer, writeConfig);
+
+ java.lang.reflect.Field metricsField =
HoodieBackedTableMetadataWriter.class.getDeclaredField("metrics");
+ metricsField.setAccessible(true);
+ metricsField.set(writer, Option.of(metrics));
+
+ // Call the real performTableServices method
+ doCallRealMethod().when(writer).performTableServices(any(), eq(true));
+
+ if (shouldFailOnTableServiceFailures) {
+ // When shouldFailOnTableServiceFailures is true, exception should
propagate
+ assertThrows(exceptionToThrow.getClass(), () ->
+ writer.performTableServices(Option.empty(), true),
+ "Expected exception to be thrown when " + serviceType + " fails and
shouldFailOnTableServiceFailures is true");
+ } else {
+ // When shouldFailOnTableServiceFailures is false, exception should not
propagate
+ assertDoesNotThrow(() ->
+ writer.performTableServices(Option.empty(), true),
+ "Exception should not be thrown when
shouldFailOnTableServiceFailures is false");
+ }
+
+ // Verify the appropriate service method was called
+ if (hasPendingCompaction) {
+ verify(writeClient, times(1)).runAnyPendingCompactions();
+ }
+ if (hasPendingLogCompaction) {
+ verify(writeClient, times(1)).runAnyPendingLogCompactions();
+ }
+
+ // Verify metrics are incremented when there's a failure
+ verify(metrics,
times(1)).incrementMetric(HoodieMetadataMetrics.PENDING_COMPACTIONS_FAILURES,
1);
+ }
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 9fa1d0de2343..94e5374d1ebc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -616,6 +616,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
+ "honor the set value for number of tasks. If not, number of write
status's from data "
+ "table writes will be used for metadata table record preparation");
+ public static final ConfigProperty<Boolean> FAIL_ON_TABLE_SERVICE_FAILURES =
ConfigProperty
+ .key(METADATA_PREFIX + ".write.fail.on.table.service.failures")
+ .defaultValue(true)
+ .markAdvanced()
+ .sinceVersion("1.2.0")
+ .withDocumentation("when set to true, it fails the job on metadata
table's "
+ + "table services operation failure");
+
public long getMaxLogFileSize() {
return getLong(MAX_LOG_FILE_SIZE_BYTES_PROP);
}
@@ -898,6 +906,10 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getBooleanOrDefault(DERIVE_FROM_DATA_TABLE_CLEAN_POLICY);
}
+ public boolean shouldFailOnTableServiceFailures() {
+ return getBooleanOrDefault(FAIL_ON_TABLE_SERVICE_FAILURES);
+ }
+
/**
* Checks if a specific metadata index is marked for dropping based on the
metadata configuration.
* NOTE: Only applicable for secondary indexes (SI) or expression indexes
(EI).
@@ -1210,6 +1222,11 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
+ public Builder setFailOnTableServiceFailures(boolean
failOnTableServiceFailures) {
+ metadataConfig.setValue(FAIL_ON_TABLE_SERVICE_FAILURES,
String.valueOf(failOnTableServiceFailures));
+ return this;
+ }
+
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE,
getDefaultMetadataEnable(engineType));
metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS,
getDefaultColStatsEnable(engineType));
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
index 32754a158725..c7244a30d094 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMetrics.java
@@ -79,6 +79,9 @@ public class HoodieMetadataMetrics implements Serializable {
public static final String TABLE_SERVICE_EXECUTION_STATUS =
"table_service_execution_status";
public static final String TABLE_SERVICE_EXECUTION_DURATION =
"table_service_execution_duration";
public static final String ASYNC_INDEXER_CATCHUP_TIME =
"async_indexer_catchup_time";
+ public static final String COMPACTION_FAILURES = "compaction_failures";
+ public static final String LOG_COMPACTION_FAILURES =
"logcompaction_failures";
+ public static final String PENDING_COMPACTIONS_FAILURES =
"pending_compactions_failures";
private final transient MetricRegistry metricsRegistry;
private final transient Metrics metrics;
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
index 060362fd92be..028c69683ef2 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
@@ -142,37 +142,33 @@ class TestHoodieMetadataConfig {
}
@Test
- void testUseMainTableCleanPolicy() {
+ void testFailOnTableServiceFailures() {
// Test default value
HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
- assertTrue(config.shouldDeriveFromDataTableCleanPolicy());
+ assertTrue(config.shouldFailOnTableServiceFailures());
- // Test setting to false using builder method
+ // Test setting to false via Properties
+ Properties propsFalse = new Properties();
+ propsFalse.put(HoodieMetadataConfig.FAIL_ON_TABLE_SERVICE_FAILURES.key(),
"false");
HoodieMetadataConfig configWithFalse = HoodieMetadataConfig.newBuilder()
- .deriveFromDataTableCleanPolicy(false)
+ .fromProperties(propsFalse)
.build();
- assertFalse(configWithFalse.shouldDeriveFromDataTableCleanPolicy());
+ assertFalse(configWithFalse.shouldFailOnTableServiceFailures());
- // Test setting to true using builder method
- HoodieMetadataConfig configWithTrue = HoodieMetadataConfig.newBuilder()
- .deriveFromDataTableCleanPolicy(true)
+ // Test setting to true via builder method
+ HoodieMetadataConfig configWithBuilder = HoodieMetadataConfig.newBuilder()
+ .setFailOnTableServiceFailures(true)
.build();
- assertTrue(configWithTrue.shouldDeriveFromDataTableCleanPolicy());
+ assertTrue(configWithBuilder.shouldFailOnTableServiceFailures());
- // Test custom value via Properties - false
- Properties propsFalse = new Properties();
-
propsFalse.put(HoodieMetadataConfig.DERIVE_FROM_DATA_TABLE_CLEAN_POLICY.key(),
"false");
- HoodieMetadataConfig configWithPropertiesFalse =
HoodieMetadataConfig.newBuilder()
- .fromProperties(propsFalse)
+ // Test setting to false via builder method
+ HoodieMetadataConfig configWithBuilderFalse =
HoodieMetadataConfig.newBuilder()
+ .setFailOnTableServiceFailures(false)
.build();
-
assertFalse(configWithPropertiesFalse.shouldDeriveFromDataTableCleanPolicy());
+ assertFalse(configWithBuilderFalse.shouldFailOnTableServiceFailures());
- // Test custom value via Properties - true
- Properties propsTrue = new Properties();
-
propsTrue.put(HoodieMetadataConfig.DERIVE_FROM_DATA_TABLE_CLEAN_POLICY.key(),
"true");
- HoodieMetadataConfig configWithPropertiesTrue =
HoodieMetadataConfig.newBuilder()
- .fromProperties(propsTrue)
- .build();
-
assertTrue(configWithPropertiesTrue.shouldDeriveFromDataTableCleanPolicy());
+ // Verify the config key is correct
+ assertEquals("hoodie.metadata.write.fail.on.table.service.failures",
+ HoodieMetadataConfig.FAIL_ON_TABLE_SERVICE_FAILURES.key());
}
}