nada-attia commented on code in PR #18133:
URL: https://github.com/apache/hudi/pull/18133#discussion_r2805187577
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/io/TestHoodieTimelineArchiver.java:
##########
@@ -2048,4 +2053,51 @@ private void assertInstantListEquals(List<HoodieInstant>
expected, List<HoodieIn
assertEquals(expectedInstant.getState(), actualInstant.getState());
}
}
+
+ @Test
+ public void testArchiveWithOOMOnLargeCommitFile() throws Exception {
+ // Initialize table with archival config
+ HoodieWriteConfig writeConfig = initTestTableAndGetWriteConfig(false, 2,
3, 2);
+ writeConfig.setValue(HoodieArchivalConfig.COMMITS_ARCHIVAL_BATCH_SIZE,
"20");
+
+ // Create multiple large commit file by adding many write stats
+ for (int i = 1; i < 20; i++) {
+ String largeCommitTime = String.format("0000000%d", i);
+ testTable.addInflightCommit(largeCommitTime);
+ HoodieCommitMetadata largeCommitMetadata = new HoodieCommitMetadata();
+ largeCommitMetadata.addMetadata(HoodieCommitMetadata.SCHEMA_KEY,
HoodieTestTable.PHONY_TABLE_SCHEMA);
+
+ // Add 500k write stats to simulate a large commit file
+ for (int j = 0; j < 500000; j++) {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setPartitionPath("p1");
+ writeStat.setPath("p1/file_" + j);
+ writeStat.setFileId("file_" + j);
+ writeStat.setTotalWriteBytes(1);
+ writeStat.setFileSizeInBytes(1);
+ largeCommitMetadata.addWriteStat("p1", writeStat);
+ }
+
+ // Save the large commit
+ metaClient.getActiveTimeline().saveAsComplete(
+ new HoodieInstant(State.INFLIGHT, HoodieTimeline.COMMIT_ACTION,
largeCommitTime, InstantComparatorV2.REQUESTED_TIME_BASED_COMPARATOR),
+
Option.of(largeCommitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ metaClient.reloadActiveTimeline();
+ }
+
+ // Create archiver and attempt archival
+ HoodieTable table = HoodieSparkTable.create(writeConfig, context,
metaClient);
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
writeConfig, table);
+
+ // Verify that archival throws OOM
+ assertThrows(OutOfMemoryError.class, () ->
archiver.archiveIfRequired(context));
+
+ // Verify that OOM metric is recorded
+ Map<String, Long> metrics = archiver.getMetrics();
+ assertEquals(1L, metrics.get(ArchivalMetrics.ARCHIVAL_OOM_FAILURE));
+ assertEquals(17L, metrics.get(ArchivalMetrics.ARCHIVAL_NUM_ALL_COMMITS));
Review Comment:
added another test case to verify the metric counts for all different types
of commits archived (see testArchivalMetricsWithMixedActionTypes)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]