danny0405 commented on code in PR #12327:
URL: https://github.com/apache/hudi/pull/12327#discussion_r1855727668
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -180,4 +191,165 @@ public static long
convertCompletionTimeToEpoch(HoodieInstant instant) {
return -1;
}
}
+
+ static void rollbackFailedWritesAndCompact(HoodieTable table,
HoodieEngineContext context, HoodieWriteConfig config,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper, boolean shouldCompact) {
+ try {
+ // set required configs for rollback
+
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
+ HoodieWriteConfig rollbackWriteConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+ // set eager cleaning
+
rollbackWriteConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.EAGER.name());
+ // TODO: Check if we should hardcode disable rollback using markers.
+ // With changes in https://github.com/apache/hudi/pull/12206, we
may not need to do that.
+
rollbackWriteConfig.setValue(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(),
String.valueOf(config.shouldRollbackUsingMarkers()));
+ // set compaction configs
+ if (shouldCompact) {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
+ } else {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"false");
+ }
+ // TODO: Check if we need to disable metadata table.
+ rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(config.isMetadataTableEnabled()));
+ // TODO: This util method is used in both upgrade and downgrade.
+ // Check if we need to instantiate write client with correct table
version or is that handled internally.
+ try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
+ writeClient.rollbackFailedWrites();
+ HoodieTableMetaClient.reload(table.getMetaClient());
+ if (shouldCompact) {
+ Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
+ if (compactionInstantOpt.isPresent()) {
+ writeClient.compact(compactionInstantOpt.get());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
+ timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion),
+ "Upgrade to LSM timeline is only supported for layout version 1.
Given version: " + timelineLayoutVersion));
+ HoodieArchivedTimeline archivedTimeline =
table.getMetaClient().getArchivedTimeline();
+ if (archivedTimeline.getInstants().isEmpty()) {
+ return;
+ }
+ Consumer<Exception> exceptionHandler = e -> {
+ if (config.isFailOnTimelineArchivingEnabled()) {
+ throw new HoodieException(e);
+ }
+ };
+ try {
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(TimelineLayoutVersion.LAYOUT_VERSION_2, config,
table);
+ archiver.archiveInstants(engineContext, archivedTimeline.getInstants(),
true);
+ } catch (Exception e) {
+ exceptionHandler.accept(e);
+ }
+ }
+
+ static void downgradeFromLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+ // if timeline layout version is present in the Option then check if it is
LAYOUT_VERSION_2
+
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
+ timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_2.equals(timelineLayoutVersion),
+ "Downgrade from LSM timeline is only supported for layout version
2. Given version: " + timelineLayoutVersion));
+ HoodieArchivedTimeline lsmArchivedTimeline =
table.getMetaClient().getArchivedTimeline();
+ if (lsmArchivedTimeline.getInstants().isEmpty()) {
Review Comment:
read the LSM archived instants as a iterator and flush as a accumulated
mini-batch. `ArchivedTimelineLoader.loadInstants` should be used instead.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -180,4 +191,165 @@ public static long
convertCompletionTimeToEpoch(HoodieInstant instant) {
return -1;
}
}
+
+ static void rollbackFailedWritesAndCompact(HoodieTable table,
HoodieEngineContext context, HoodieWriteConfig config,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper, boolean shouldCompact) {
+ try {
+ // set required configs for rollback
+
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
+ HoodieWriteConfig rollbackWriteConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+ // set eager cleaning
+
rollbackWriteConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.EAGER.name());
+ // TODO: Check if we should hardcode disable rollback using markers.
+ // With changes in https://github.com/apache/hudi/pull/12206, we
may not need to do that.
+
rollbackWriteConfig.setValue(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(),
String.valueOf(config.shouldRollbackUsingMarkers()));
+ // set compaction configs
+ if (shouldCompact) {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
+ } else {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"false");
+ }
+ // TODO: Check if we need to disable metadata table.
+ rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(config.isMetadataTableEnabled()));
+ // TODO: This util method is used in both upgrade and downgrade.
+ // Check if we need to instantiate write client with correct table
version or is that handled internally.
+ try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
+ writeClient.rollbackFailedWrites();
+ HoodieTableMetaClient.reload(table.getMetaClient());
+ if (shouldCompact) {
+ Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
+ if (compactionInstantOpt.isPresent()) {
+ writeClient.compact(compactionInstantOpt.get());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
+ timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion),
+ "Upgrade to LSM timeline is only supported for layout version 1.
Given version: " + timelineLayoutVersion));
+ HoodieArchivedTimeline archivedTimeline =
table.getMetaClient().getArchivedTimeline();
+ if (archivedTimeline.getInstants().isEmpty()) {
+ return;
+ }
+ Consumer<Exception> exceptionHandler = e -> {
+ if (config.isFailOnTimelineArchivingEnabled()) {
+ throw new HoodieException(e);
+ }
+ };
+ try {
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(TimelineLayoutVersion.LAYOUT_VERSION_2, config,
table);
+ archiver.archiveInstants(engineContext, archivedTimeline.getInstants(),
true);
Review Comment:
read the legacy archived instants as a iterator and flush as a accumulated
mini-batch. `ArchivedTimelineLoader.loadInstants` should be used instead.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]