This is an automated email from the ASF dual-hosted git repository.
pwason pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 73e710de1332 feat(table-services): Emit archival metrics for
monitoring and debugging (#18133)
73e710de1332 is described below
commit 73e710de1332d88a6b8f8d751d56d93ee129dbb6
Author: Nada <[email protected]>
AuthorDate: Fri Feb 27 13:45:34 2026 -0500
feat(table-services): Emit archival metrics for monitoring and debugging
(#18133)
* Emit archival metrics for common OOM failure
Summary:
The following OOM failure is a common archival failure:
```
2025-05-19T14:24:21-04:00 INFO [pool-18-thread-1] ApplicationMaster:
Unregistering ApplicationMaster with FAILED (diag message: User class threw
exception: java.lang.OutOfMemoryError
at
java.base/java.io.ByteArrayOutputStream.hugeCapacity(ByteArrayOutputStream.java:125)
at
java.base/java.io.ByteArrayOutputStream.grow(ByteArrayOutputStream.java:119)
at
java.base/java.io.ByteArrayOutputStream.ensureCapacity(ByteArrayOutputStream.java:95)
at
java.base/java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:156)
at
java.base/java.io.DataOutputStream.write(DataOutputStream.java:107)
at
java.base/java.io.FilterOutputStream.write(FilterOutputStream.java:108)
at
org.apache.hudi.common.table.log.block.HoodieAvroDataBlock.serializeRecords(HoodieAvroDataBlock.java:133)
at
org.apache.hudi.common.table.log.block.HoodieDataBlock.getContentBytes(HoodieDataBlock.java:116)
at
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlocks(HoodieLogFormatWriter.java:175)
at
org.apache.hudi.common.table.log.HoodieLogFormatWriter.appendBlock(HoodieLogFormatWriter.java:152)
at
org.apache.hudi.client.HoodieTimelineArchiver.writeToFile(HoodieTimelineArchiver.java:692)
at
org.apache.hudi.client.HoodieTimelineArchiver.archive(HoodieTimelineArchiver.java:663)
at
org.apache.hudi.client.HoodieTimelineArchiver.archiveIfRequired(HoodieTimelineArchiver.java:179)
at
org.apache.hudi.client.BaseHoodieTableServiceClient.archive(BaseHoodieTableServiceClient.java:827)
at
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:963)
at
org.apache.hudi.client.BaseHoodieWriteClient.archive(BaseHoodieWriteClient.java:973)
at
com.uber.hudi.tools.manager.HoodieOperationArchival.runArchival(HoodieOperationArchival.java:61)
at
com.uber.hudi.tools.manager.HoodieOperationArchival.execute(HoodieOperationArchival.java:49)
at
com.uber.hudi.tools.manager.HoodieManager.main(HoodieManager.java:54)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:804)
)
```
Added metrics to capture this failure scenario. Refactored the existing
archival metrics to support this.
Test Plan: Added unit test
Reviewers: bkrishen, jingli, O955 Project Hoodie Project Reviewer: Add
blocking reviewers, #hoodie_blocking_reviewers, ureview
Reviewed By: bkrishen, jingli, O955 Project Hoodie Project Reviewer: Add
blocking reviewers, #hoodie_blocking_reviewers
Tags: #hudi_0.14, #has_java
JIRA Issues: HUDI-6816
Differential Revision: https://code.uberinternal.com/D17816129
* feat: Emit archival metrics for monitoring and debugging
Add detailed metrics collection during timeline archival to help monitor
and debug archival operations. This includes:
- OOM failure detection and tracking
- General exception tracking with exception class names
- Count of commits being archived (all commits and write commits)
- Archival operation status (success/failure)
The metrics are collected in both TimelineArchiverV1 and TimelineArchiverV2
and emitted through HoodieMetrics for external monitoring systems.
* refactor: Remove HoodieTimelineArchiver concrete implementation
This concrete implementation class doesn't exist in upstream master and was
incorrectly brought back during commit porting. The archiver functionality
is correctly implemented in the versioned archiver classes:
- TimelineArchiverV1 (for 0.x timeline)
- TimelineArchiverV2 (for 1.x LSM timeline)
The interface at client/timeline/HoodieTimelineArchiver.java is the correct
abstraction, and the metrics improvements are properly implemented in both
v1 and v2 archiver classes.
* fix: Update imports and usage for refactored HoodieTimelineArchiver
Fix compilation errors after HoodieTimelineArchiver was refactored:
- Add missing imports for HoodieTimelineArchiver and TimelineArchivers in
BaseHoodieTableServiceClient
- Remove duplicate java.util.Map import in HoodieMetrics
- Update test to use TimelineArchivers factory method instead of direct
instantiation
- Fix HoodieInstant constructor call to include required Comparator
parameter
* refactor: Consolidate archival commit metrics logic and add DELTA_COMMIT
support
- Move addArchivalCommitMetrics to ArchivalMetrics utility class for reuse
- Add DELTA_COMMIT_ACTION to write commit metrics (was missing in V2)
- Remove unused emitCleanFailure method from HoodieMetrics
* test: Use Mockito to simulate OOM in archival test instead of creating
500K files
Replace heavy test setup that created 19 commits with 500K HoodieWriteStat
objects each with a lightweight approach using Mockito to mock the
LSMTimelineWriter and throw OOM on write(). This makes the test faster
and more reliable since it doesn't depend on JVM memory settings.
* feat: Add clean and rollback metrics to archival and add test for mixed
action types
- Add ARCHIVAL_NUM_CLEAN_COMMITS and ARCHIVAL_NUM_ROLLBACK_COMMITS metrics
- Update addArchivalCommitMetrics to record counts for clean and rollback
actions
- Add test to verify archival metrics with mixed action types (commits,
replace
commits, cleans, rollbacks)
---
.../hudi/client/BaseHoodieTableServiceClient.java | 7 +-
.../client/timeline/HoodieTimelineArchiver.java | 10 ++
.../timeline/versioning/v1/TimelineArchiverV1.java | 26 +++-
.../timeline/versioning/v2/TimelineArchiverV2.java | 27 +++-
.../apache/hudi/client/utils/ArchivalMetrics.java | 75 +++++++++++
.../org/apache/hudi/metrics/HoodieMetrics.java | 8 ++
.../apache/hudi/io/TestHoodieTimelineArchiver.java | 141 +++++++++++++++++++++
7 files changed, 291 insertions(+), 3 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index b505ee60c40e..cb1777ad2adf 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -897,9 +897,10 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
}
final Timer.Context timerContext = metrics.getArchiveCtx();
int instantsToArchive = 0;
+ HoodieTimelineArchiver archiver = null;
try {
// We cannot have unbounded commit files. Archive commits if we have to
archive.
- HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
config, table);
+ archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
config, table);
instantsToArchive = archiver.archiveIfRequired(context, true);
} catch (IOException ioe) {
throw new HoodieIOException("Failed to archive", ioe);
@@ -908,6 +909,10 @@ public abstract class BaseHoodieTableServiceClient<I, T,
O> extends BaseHoodieCl
long durationMs = metrics.getDurationInMs(timerContext.stop());
this.metrics.updateArchiveMetrics(durationMs, instantsToArchive);
}
+ // Emit additional archival metrics (OOM tracking, failure tracking,
etc.)
+ if (archiver != null) {
+ this.metrics.updateArchivalMetrics(archiver.getMetrics());
+ }
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
index b1c1295cec52..6bd839405dee 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/HoodieTimelineArchiver.java
@@ -23,6 +23,8 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
import java.io.IOException;
+import java.util.Collections;
+import java.util.Map;
/**
* Archiver to bound the growth of files under .hoodie meta path.
@@ -37,4 +39,12 @@ public interface HoodieTimelineArchiver<T extends
HoodieAvroPayload, I, K, O> {
* Check if commits need to be archived. If yes, archive commits.
*/
int archiveIfRequired(HoodieEngineContext context, boolean acquireLock)
throws IOException;
+
+ /**
+ * Returns metrics collected during archival.
+ * Keys are metric names, values are metric values.
+ */
+ default Map<String, Long> getMetrics() {
+ return Collections.emptyMap();
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
index b78597927f29..e693a1c2e7fc 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java
@@ -22,6 +22,7 @@ package org.apache.hudi.client.timeline.versioning.v1;
import org.apache.hudi.avro.model.HoodieArchivedMetaEntry;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.ArchivalMetrics;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieArchivedLogFile;
import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
@@ -96,6 +97,7 @@ public class TimelineArchiverV1<T extends HoodieAvroPayload,
I, K, O> implements
private final HoodieTable<T, I, K, O> table;
private final HoodieTableMetaClient metaClient;
private final TransactionManager txnManager;
+ private final Map<String, Long> metrics;
public TimelineArchiverV1(HoodieWriteConfig config, HoodieTable<T, I, K, O>
table) {
this.config = config;
@@ -106,6 +108,7 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
Pair<Integer, Integer> minAndMaxInstants =
getMinAndMaxInstantsToKeep(table, metaClient);
this.minInstantsToKeep = minAndMaxInstants.getLeft();
this.maxInstantsToKeep = minAndMaxInstants.getRight();
+ this.metrics = new HashMap<>();
}
private Writer openWriter(StoragePath archivePath) {
@@ -141,17 +144,27 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
txnManager.beginStateChange(Option.empty(), Option.empty());
}
List<HoodieInstant> instantsToArchive = getInstantsToArchive();
+ addArchivalCommitMetrics(instantsToArchive);
+ boolean success = true;
if (!instantsToArchive.isEmpty()) {
this.writer = openWriter(archiveFilePath.getParent());
log.info("Archiving instants {} for table {}", instantsToArchive,
config.getBasePath());
archive(context, instantsToArchive);
log.info("Deleting archived instants {} for table {}",
instantsToArchive, config.getBasePath());
- deleteArchivedInstants(instantsToArchive, context);
+ success = deleteArchivedInstants(instantsToArchive, context);
} else {
log.info("No Instants to archive for table {}", config.getBasePath());
}
+ metrics.put(ArchivalMetrics.ARCHIVAL_STATUS, success ? 1L : -1L);
return instantsToArchive.size();
+ } catch (OutOfMemoryError oom) {
+ metrics.put(ArchivalMetrics.ARCHIVAL_OOM_FAILURE, 1L);
+ throw oom;
+ } catch (Exception e) {
+ String failureMetricName = String.join(".",
ArchivalMetrics.ARCHIVAL_FAILURE, e.getClass().getSimpleName());
+ metrics.put(failureMetricName, 1L);
+ throw e;
} finally {
close();
if (acquireLock) {
@@ -160,6 +173,17 @@ public class TimelineArchiverV1<T extends
HoodieAvroPayload, I, K, O> implements
}
}
+ @Override
+ public Map<String, Long> getMetrics() {
+ return metrics;
+ }
+
+ private void addArchivalCommitMetrics(List<HoodieInstant> instantsToArchive)
{
+ ArchivalMetrics.addArchivalCommitMetrics(
+ instantsToArchive.stream().filter(HoodieInstant::isCompleted),
+ metrics);
+ }
+
/**
* Keeping for downgrade from 1.x LSM archived timeline.
*/
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
index 5242bf58b7a7..ebcd16e2361e 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java
@@ -21,6 +21,7 @@ package org.apache.hudi.client.timeline.versioning.v2;
import org.apache.hudi.client.timeline.HoodieTimelineArchiver;
import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.ArchivalMetrics;
import org.apache.hudi.common.NativeTableFormat;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieAvroPayload;
@@ -52,6 +53,7 @@ import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -78,6 +80,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload,
I, K, O> implements
private final TransactionManager txnManager;
private final LSMTimelineWriter timelineWriter;
+ private final Map<String, Long> metrics;
public TimelineArchiverV2(HoodieWriteConfig config, HoodieTable<T, I, K, O>
table) {
this.config = config;
@@ -88,6 +91,7 @@ public class TimelineArchiverV2<T extends HoodieAvroPayload,
I, K, O> implements
Pair<Integer, Integer> minAndMaxInstants =
getMinAndMaxInstantsToKeep(table, metaClient);
this.minInstantsToKeep = minAndMaxInstants.getLeft();
this.maxInstantsToKeep = minAndMaxInstants.getRight();
+ this.metrics = new HashMap<>();
}
@Override
@@ -105,6 +109,8 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
try {
// Sort again because the cleaning and rollback instants could break the
sequence.
List<ActiveAction> instantsToArchive =
getInstantsToArchive().sorted().collect(Collectors.toList());
+ addArchivalCommitMetrics(instantsToArchive);
+ boolean success = true;
if (!instantsToArchive.isEmpty()) {
log.info("Archiving and deleting instants {}", instantsToArchive);
Consumer<Exception> exceptionHandler = e -> {
@@ -114,7 +120,7 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
};
this.timelineWriter.write(instantsToArchive, Option.of(action ->
deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
log.debug("Deleting archived instants");
- deleteArchivedActions(instantsToArchive, context);
+ success = deleteArchivedActions(instantsToArchive, context);
// triggers compaction and cleaning only after archiving action
this.timelineWriter.compactAndClean(context);
Supplier<List<HoodieInstant>> archivedInstants = () ->
instantsToArchive.stream()
@@ -125,7 +131,15 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
} else {
log.info("No Instants to archive");
}
+ metrics.put(ArchivalMetrics.ARCHIVAL_STATUS, success ? 1L : -1L);
return instantsToArchive.size();
+ } catch (OutOfMemoryError oom) {
+ metrics.put(ArchivalMetrics.ARCHIVAL_OOM_FAILURE, 1L);
+ throw oom;
+ } catch (Exception e) {
+ String failureMetricName = String.join(".",
ArchivalMetrics.ARCHIVAL_FAILURE, e.getClass().getSimpleName());
+ metrics.put(failureMetricName, 1L);
+ throw e;
} finally {
if (acquireLock) {
txnManager.endStateChange(Option.empty());
@@ -133,6 +147,17 @@ public class TimelineArchiverV2<T extends
HoodieAvroPayload, I, K, O> implements
}
}
+ @Override
+ public Map<String, Long> getMetrics() {
+ return metrics;
+ }
+
+ private void addArchivalCommitMetrics(List<ActiveAction> instantsToArchive) {
+ ArchivalMetrics.addArchivalCommitMetrics(
+ instantsToArchive.stream().flatMap(action ->
action.getCompletedInstants().stream()),
+ metrics);
+ }
+
private List<HoodieInstant>
getCleanAndRollbackInstantsToArchive(HoodieInstant
latestCommitInstantToArchive) {
HoodieTimeline cleanAndRollbackTimeline = table.getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(HoodieTimeline.CLEAN_ACTION,
HoodieTimeline.ROLLBACK_ACTION))
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
new file mode 100644
index 000000000000..1bc002fad08e
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Constants and utilities for archival metrics.
+ */
+public final class ArchivalMetrics {
+ public static final String ARCHIVAL_OOM_FAILURE = "archivalOutOfMemory";
+ public static final String ARCHIVAL_NUM_ALL_COMMITS =
"archivalNumAllCommits";
+ public static final String ARCHIVAL_NUM_WRITE_COMMITS =
"archivalNumWriteCommits";
+ public static final String ARCHIVAL_NUM_CLEAN_COMMITS =
"archivalNumCleanCommits";
+ public static final String ARCHIVAL_NUM_ROLLBACK_COMMITS =
"archivalNumRollbackCommits";
+ public static final String ARCHIVAL_FAILURE = "archivalFailure";
+ public static final String ARCHIVAL_STATUS = "archivalStatus";
+
+ private static final Set<String> WRITE_COMMIT_ACTIONS =
CollectionUtils.createSet(
+ HoodieTimeline.COMMIT_ACTION,
+ HoodieTimeline.DELTA_COMMIT_ACTION,
+ HoodieTimeline.REPLACE_COMMIT_ACTION);
+
+ private ArchivalMetrics() {
+ // Private constructor to prevent instantiation
+ }
+
+ /**
+ * Adds archival commit metrics to the given metrics map based on the
completed instants.
+ *
+ * @param completedInstants stream of completed instants to archive
+ * @param metrics map to populate with archival metrics
+ */
+ public static void addArchivalCommitMetrics(Stream<HoodieInstant>
completedInstants, Map<String, Long> metrics) {
+ // Collect to list since we need to iterate multiple times
+ List<HoodieInstant> instantsList =
completedInstants.collect(Collectors.toList());
+ metrics.put(ARCHIVAL_NUM_ALL_COMMITS, (long) instantsList.size());
+ long writeCommitCount = instantsList.stream()
+ .filter(instant -> WRITE_COMMIT_ACTIONS.contains(instant.getAction()))
+ .count();
+ metrics.put(ARCHIVAL_NUM_WRITE_COMMITS, writeCommitCount);
+ long cleanCommitCount = instantsList.stream()
+ .filter(instant ->
HoodieTimeline.CLEAN_ACTION.equals(instant.getAction()))
+ .count();
+ metrics.put(ARCHIVAL_NUM_CLEAN_COMMITS, cleanCommitCount);
+ long rollbackCommitCount = instantsList.stream()
+ .filter(instant ->
HoodieTimeline.ROLLBACK_ACTION.equals(instant.getAction()))
+ .count();
+ metrics.put(ARCHIVAL_NUM_ROLLBACK_COMMITS, rollbackCommitCount);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
index 1bd97211cea1..c94087b1b149 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java
@@ -36,6 +36,7 @@ import com.codahale.metrics.Timer;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
+import java.util.Map;
import java.util.Set;
import static
org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator.MILLIS_INSTANT_TIMESTAMP_FORMAT_LENGTH;
@@ -361,6 +362,13 @@ public class HoodieMetrics {
}
}
+ public void updateArchivalMetrics(Map<String, Long> archivalMetrics) {
+ if (config.isMetricsOn()) {
+ log.info(String.format("Sending archival metrics %s", archivalMetrics));
+ archivalMetrics.forEach((metricName, metricValue) ->
metrics.registerGauge(getMetricsName("archival", metricName), metricValue));
+ }
+ }
+
public void updateFinalizeWriteMetrics(long durationInMs, long
numFilesFinalized) {
if (config.isMetricsOn()) {
log.debug("Sending finalize write metrics ({}={}, {}={})", DURATION_STR,
durationInMs,
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
index a6c25b7e9096..c631ec00626c 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java
@@ -22,10 +22,12 @@ import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRollbackMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.client.timeline.TimelineArchivers;
import org.apache.hudi.client.WriteClientTestUtils;
import org.apache.hudi.client.timeline.versioning.v2.LSMTimelineWriter;
import org.apache.hudi.client.timeline.versioning.v2.TimelineArchiverV2;
import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
+import org.apache.hudi.client.utils.ArchivalMetrics;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.model.HoodieCleaningPolicy;
import org.apache.hudi.common.model.HoodieCommitMetadata;
@@ -88,6 +90,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.io.ByteArrayInputStream;
import java.io.IOException;
+import java.lang.reflect.Field;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
@@ -138,7 +141,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
@Slf4j
public class TestHoodieTimelineArchiver extends HoodieSparkClientTestHarness {
@@ -2048,4 +2055,138 @@ public class TestHoodieTimelineArchiver extends
HoodieSparkClientTestHarness {
assertEquals(expectedInstant.getState(), actualInstant.getState());
}
}
+
+ @Test
+ public void testArchiveWithOOMOnLargeCommitFile() throws Exception {
+ // Initialize table with archival config
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2,
3, 2);
+ writeConfig.setValue(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE,
"20");
+
+ // Create commits that will be archived
+ for (int i = 1; i <= 10; i++) {
+ String commitTime = String.format("%08d", i);
+ testTable.doWriteOperation(commitTime, WriteOperationType.UPSERT,
Collections.singletonList("p1"),
+ Collections.singletonList("p1"), 1);
+ }
+
+ // Create archiver
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ TimelineArchiverV2 archiver = (TimelineArchiverV2)
TimelineArchivers.getInstance(
+ table.getMetaClient().getTimelineLayoutVersion(), writeConfig, table);
+
+ // Use reflection to inject a mock LSMTimelineWriter that throws OOM
+ LSMTimelineWriter mockWriter = mock(LSMTimelineWriter.class);
+ doThrow(new OutOfMemoryError("Simulated
OOM")).when(mockWriter).write(any(), any(), any());
+
+ Field writerField =
TimelineArchiverV2.class.getDeclaredField("timelineWriter");
+ writerField.setAccessible(true);
+ writerField.set(archiver, mockWriter);
+
+ // Verify that archival throws OOM
+ assertThrows(OutOfMemoryError.class, () ->
archiver.archiveIfRequired(context));
+
+ // Verify that OOM metric is recorded
+ Map<String, Long> metrics = archiver.getMetrics();
+ assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_OOM_FAILURE));
+
+ // Verify that commit metrics were recorded before OOM
+ assertTrue(metrics.containsKey(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS));
+ }
+
+ @Test
+ public void testArchivalMetricsWithMixedActionTypes() throws Exception {
+ // Initialize table with archival config: min=2, max=4
+ // This means archival will trigger when we have > 4 write commits and
will archive down to 2
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2,
4, 2);
+
+ Map<String, Integer> cleanStats = new HashMap<>();
+ cleanStats.put("p1", 1);
+
+ // Create a mix of action types in a specific order:
+ // Timeline: C1, C2, C3, C4, CL5, CL6, RB7, RB8, RC9, RC10, C11, C12, C13,
C14
+ // Where C=commit, CL=clean, RB=rollback, RC=replace_commit (cluster)
+
+ // Commits 1-4: regular write commits
+ for (int i = 1; i <= 4; i++) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT,
+ i == 1 ? Collections.singletonList("p1") : Collections.emptyList(),
+ Collections.singletonList("p1"), 2);
+ }
+
+ // Commits 5-6: clean commits (will be archived along with commits before
them)
+ testTable.doClean(String.format("%08d", 5), cleanStats,
Collections.emptyMap());
+ testTable.doClean(String.format("%08d", 6), cleanStats,
Collections.emptyMap());
+
+ // Commits 7-8: rollback commits
+ testTable.doWriteOperation(String.format("%08d", 7),
WriteOperationType.UPSERT,
+ Collections.emptyList(), Collections.singletonList("p1"), 2);
+ testTable.doRollback(String.format("%08d", 7), String.format("%08d", 8));
+
+ // Commits 9-10: replace commits (clustering)
+ testTable.doCluster(String.format("%08d", 9), Collections.emptyMap(),
Collections.singletonList("p1"), 2);
+ testTable.doCluster(String.format("%08d", 10), Collections.emptyMap(),
Collections.singletonList("p1"), 2);
+
+ // Commits 11-14: more write commits to trigger archival
+ for (int i = 11; i <= 14; i++) {
+ testTable.doWriteOperation(String.format("%08d", i),
WriteOperationType.UPSERT,
+ Collections.emptyList(), Collections.singletonList("p1"), 2);
+ }
+
+ // Get timeline before archival
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline beforeArchival =
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
+ List<HoodieInstant> instantsBeforeArchival = beforeArchival.getInstants();
+
+ // Create archiver and trigger archival
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ TimelineArchiverV2 archiver = new TimelineArchiverV2(writeConfig, table);
+ int archivedCount = archiver.archiveIfRequired(context);
+
+ // Get timeline after archival
+ metaClient.reloadActiveTimeline();
+ HoodieTimeline afterArchival =
metaClient.getActiveTimeline().getAllCommitsTimeline().filterCompletedInstants();
+ List<HoodieInstant> instantsAfterArchival = afterArchival.getInstants();
+
+ // Calculate what was actually archived
+ Set<HoodieInstant> afterSet = new HashSet<>(instantsAfterArchival);
+ List<HoodieInstant> archivedInstants = instantsBeforeArchival.stream()
+ .filter(instant -> !afterSet.contains(instant))
+ .collect(Collectors.toList());
+
+ // Count archived instants by action type
+ long expectedWriteCommits = archivedInstants.stream()
+ .filter(i -> i.getAction().equals(COMMIT_ACTION)
+ || i.getAction().equals(DELTA_COMMIT_ACTION)
+ || i.getAction().equals(REPLACE_COMMIT_ACTION))
+ .count();
+ long expectedCleanCommits = archivedInstants.stream()
+ .filter(i -> i.getAction().equals(CLEAN_ACTION))
+ .count();
+ long expectedRollbackCommits = archivedInstants.stream()
+ .filter(i -> i.getAction().equals(ROLLBACK_ACTION))
+ .count();
+ long expectedTotal = archivedInstants.size();
+
+ // Verify some instants were archived
+ assertTrue(archivedCount > 0, "Expected some instants to be archived");
+
+ // Verify metrics match actual archived counts
+ Map<String, Long> metrics = archiver.getMetrics();
+
+ assertEquals(expectedTotal,
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS),
+ "Total archived commits metric should match");
+ assertEquals(expectedWriteCommits,
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_WRITE_COMMITS),
+ "Write commits metric should match");
+ assertEquals(expectedCleanCommits,
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_CLEAN_COMMITS),
+ "Clean commits metric should match");
+ assertEquals(expectedRollbackCommits,
metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ROLLBACK_COMMITS),
+ "Rollback commits metric should match");
+
+ // Verify the sum of individual action types equals total
+ assertEquals(expectedTotal, expectedWriteCommits + expectedCleanCommits +
expectedRollbackCommits,
+ "Sum of action types should equal total archived");
+
+ // Verify archival status is success
+ assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_STATUS), "Archival
should succeed");
+ }
}