codope commented on code in PR #12327: URL: https://github.com/apache/hudi/pull/12327#discussion_r1857175380
########## hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java: ########## @@ -180,4 +191,165 @@ public static long convertCompletionTimeToEpoch(HoodieInstant instant) { return -1; } } + + static void rollbackFailedWritesAndCompact(HoodieTable table, HoodieEngineContext context, HoodieWriteConfig config, + SupportsUpgradeDowngrade upgradeDowngradeHelper, boolean shouldCompact) { + try { + // set required configs for rollback + HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone()); + HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder().withProps(config.getProps()).build(); + // set eager cleaning + rollbackWriteConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(), HoodieFailedWritesCleaningPolicy.EAGER.name()); + // TODO: Check if we should hardcode disable rollback using markers. + // With changes in https://github.com/apache/hudi/pull/12206, we may not need to do that. + rollbackWriteConfig.setValue(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(), String.valueOf(config.shouldRollbackUsingMarkers())); + // set compaction configs + if (shouldCompact) { + rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), "true"); + rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(), "1"); + rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(), CompactionTriggerStrategy.NUM_COMMITS.name()); + rollbackWriteConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), UnBoundedCompactionStrategy.class.getName()); + } else { + rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), "false"); + } + // TODO: Check if we need to disable metadata table. + rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(), String.valueOf(config.isMetadataTableEnabled())); + // TODO: This util method is used in both upgrade and downgrade. + // Check if we need to instantiate write client with correct table version or is that handled internally. + try (BaseHoodieWriteClient writeClient = upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) { + writeClient.rollbackFailedWrites(); + HoodieTableMetaClient.reload(table.getMetaClient()); + if (shouldCompact) { + Option<String> compactionInstantOpt = writeClient.scheduleCompaction(Option.empty()); + if (compactionInstantOpt.isPresent()) { + writeClient.compact(compactionInstantOpt.get()); + } + } + } + } catch (Exception e) { + throw new HoodieException(e); + } + } + + static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext engineContext, HoodieWriteConfig config) { + table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent( + timelineLayoutVersion -> ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion), + "Upgrade to LSM timeline is only supported for layout version 1. Given version: " + timelineLayoutVersion)); + HoodieArchivedTimeline archivedTimeline = table.getMetaClient().getArchivedTimeline(); + if (archivedTimeline.getInstants().isEmpty()) { + return; + } + Consumer<Exception> exceptionHandler = e -> { + if (config.isFailOnTimelineArchivingEnabled()) { + throw new HoodieException(e); + } + }; + try { + HoodieTimelineArchiver archiver = TimelineArchivers.getInstance(TimelineLayoutVersion.LAYOUT_VERSION_2, config, table); + archiver.archiveInstants(engineContext, archivedTimeline.getInstants(), true); Review Comment: I have changed. Now, using `LegacyArchivedMetaEntryReader` and `LSMTimelineWriter` directly. Please check latest commit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org