nsivabalan commented on code in PR #18133:
URL: https://github.com/apache/hudi/pull/18133#discussion_r2790410654
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -2048,4 +2053,51 @@ private void assertInstantListEquals(List<HoodieInstant>
expected, List<HoodieIn
assertEquals(expectedInstant.getState(), actualInstant.getState());
}
}
+
+ @Test
+ public void testArchiveWithOOMOnLargeCommitFile() throws Exception {
+ // Initialize table with archival config
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2,
3, 2);
+ writeConfig.setValue(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE,
"20");
+
+ // Create multiple large commit file by adding many write stats
+ for (int i = 1; i < 20; i++) {
+ String largeCommitTime = String.format("0000000%d", i);
+ testTable.addInflightCommit(largeCommitTime);
+ HoodieCommitMetadata largeCommitMetadata = new HoodieCommitMetadata();
+ largeCommitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestTable.PHONY_TABLE_SCHEMA);
+
+ // Add 500k write stats to simulate a large commit file
+ for (int j = 0; j < 500000; j++) {
Review Comment:
do we really need to create 500K files in a commit metadata.
can we just mock or extend the TimelineArchiver and throw OOM when
`getCommitInstantsToArchive()` is called.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metrics/HoodieMetrics.java:
##########
@@ -360,6 +361,19 @@ public void updateArchiveMetrics(long durationInMs, int
numInstantsArchived) {
}
}
+ public void updateArchivalMetrics(Map<String, Long> archivalMetrics) {
+ if (config.isMetricsOn()) {
+ log.info(String.format("Sending archival metrics %s", archivalMetrics));
+ archivalMetrics.forEach((metricName, metricValue) ->
metrics.registerGauge(getMetricsName("archival", metricName), metricValue));
+ }
+ }
+
+ public void emitCleanFailure() {
Review Comment:
was this supposed to a diff patch ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+/**
+ * Constants for archival metrics.
+ */
+public final class ArchivalMetrics {
Review Comment:
can we name this as `ArchivalMetricsConstants`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java:
##########
@@ -125,14 +131,40 @@ public int archiveIfRequired(HoodieEngineContext context,
boolean acquireLock) t
} else {
log.info("No Instants to archive");
}
+ metrics.put(ArchivalMetrics.ARCHIVAL_STATUS, success ? 1L : -1L);
return instantsToArchive.size();
+ } catch (OutOfMemoryError oom) {
+ metrics.put(ArchivalMetrics.ARCHIVAL_OOM_FAILURE, 1L);
+ throw oom;
+ } catch (Exception e) {
+ String failureMetricName = String.join(".",
ArchivalMetrics.ARCHIVAL_FAILURE, e.getClass().getSimpleName());
+ metrics.put(failureMetricName, 1L);
+ throw e;
} finally {
if (acquireLock) {
txnManager.endStateChange(Option.empty());
}
}
}
+ @Override
+ public Map<String, Long> getMetrics() {
+ return metrics;
+ }
+
+ private void addArchivalCommitMetrics(List<ActiveAction> instantsToArchive) {
Review Comment:
we should move this to a common place and reuse across both V1 and V2
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -2048,4 +2053,51 @@ private void assertInstantListEquals(List<HoodieInstant>
expected, List<HoodieIn
assertEquals(expectedInstant.getState(), actualInstant.getState());
}
}
+
+ @Test
+ public void testArchiveWithOOMOnLargeCommitFile() throws Exception {
+ // Initialize table with archival config
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2,
3, 2);
+ writeConfig.setValue(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE,
"20");
+
+ // Create multiple large commit file by adding many write stats
+ for (int i = 1; i < 20; i++) {
+ String largeCommitTime = String.format("0000000%d", i);
+ testTable.addInflightCommit(largeCommitTime);
+ HoodieCommitMetadata largeCommitMetadata = new HoodieCommitMetadata();
+ largeCommitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestTable.PHONY_TABLE_SCHEMA);
+
+ // Add 500k write stats to simulate a large commit file
+ for (int j = 0; j < 500000; j++) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath("p1");
+ writeStat.setPath("p1/file_" + j);
+ writeStat.setFileId("file_" + j);
+ writeStat.setTotalWriteBytes(1);
+ writeStat.setFileSizeInBytes(1);
+ largeCommitMetadata.addWriteStat("p1", writeStat);
+ }
+
+ // Save the large commit
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
largeCommitTime, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+
Option.of(largeCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ metaClient.reloadActiveTimeline();
+ }
+
+ // Create archiver and attempt archival
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
writeConfig, table);
+
+ // Verify that archival throws OOM
+ assertThrows(OutOfMemoryError.class, () ->
archiver.archiveIfRequired(context));
+
+ // Verify that OOM metric is recorded
+ Map<String, Long> metrics = archiver.getMetrics();
+ assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_OOM_FAILURE));
+ assertEquals(17L, metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS));
Review Comment:
lets validate all other metrics we well.
Also, lets ensure timeline contains a mix of ingestion commits, replace
commits, clean, rollbacks and archival does archive subset in each of the
action types.
and the metrics reflect the right values
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/ArchivalMetrics.java:
##########
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+/**
+ * Constants for archival metrics.
+ */
+public final class ArchivalMetrics {
+ public static final String ARCHIVAL_OOM_FAILURE = "archivalOutOfMemory";
+ public static final String ARCHIVAL_NUM_ALL_COMMITS =
"archivalNumAllCommits";
+ public static final String ARCHIVAL_NUM_WRITE_COMMITS =
"archivalNumWriteCommits";
Review Comment:
can we add clean commits, rollbacks as well.
why just write commits.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]