bvaradar commented on code in PR #12327:
URL: https://github.com/apache/hudi/pull/12327#discussion_r1855583971
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java:
##########
@@ -102,19 +102,41 @@ public int archiveIfRequired(HoodieEngineContext context,
boolean acquireLock) t
return 0;
}
+ try {
+ List<HoodieInstant> instantsToArchive = getInstantsToArchive();
+ return archiveInstants(context, instantsToArchive, false);
Review Comment:
same comment as above.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v1/TimelineArchiverV1.java:
##########
@@ -153,6 +153,21 @@ public int archiveIfRequired(HoodieEngineContext context,
boolean acquireLock) t
txnManager.beginTransaction(Option.empty(), Option.empty());
}
List<HoodieInstant> instantsToArchive =
getInstantsToArchive().collect(Collectors.toList());
+ return archiveInstants(context, instantsToArchive, false);
Review Comment:
We will be locking twice in this code-path. Can we instead introduce a
private method which archives a given list of instants and the
locking/unlocking managed in the public methods.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/timeline/versioning/v2/TimelineArchiverV2.java:
##########
@@ -102,19 +102,41 @@ public int archiveIfRequired(HoodieEngineContext context,
boolean acquireLock) t
return 0;
}
+ try {
+ List<HoodieInstant> instantsToArchive = getInstantsToArchive();
+ return archiveInstants(context, instantsToArchive, false);
+ } finally {
+ if (acquireLock) {
+ txnManager.endTransaction(Option.empty());
+ }
+ }
+ }
+
+ @Override
+ public int archiveInstants(HoodieEngineContext context, List<HoodieInstant>
instantsToArchive, boolean acquireLock) throws IOException {
+ try {
+ if (acquireLock) {
+ // there is no owner or instant time per se for archival.
+ txnManager.beginTransaction(Option.empty(), Option.empty());
+ }
+ } catch (HoodieLockException e) {
+ LOG.error("Fail to begin transaction", e);
+ return 0;
+ }
+
try {
// Sort again because the cleaning and rollback instants could break the
sequence.
- List<ActiveAction> instantsToArchive =
getInstantsToArchive().sorted().collect(Collectors.toList());
- if (!instantsToArchive.isEmpty()) {
- LOG.info("Archiving and deleting instants {}", instantsToArchive);
+ List<ActiveAction> activeActions =
getActiveActionsToArchive(instantsToArchive).sorted().collect(Collectors.toList());
+ if (!activeActions.isEmpty()) {
+ LOG.info("Archiving and deleting instants {}", activeActions);
Consumer<Exception> exceptionHandler = e -> {
if (this.config.isFailOnTimelineArchivingEnabled()) {
throw new HoodieException(e);
}
};
- this.timelineWriter.write(instantsToArchive, Option.of(action ->
deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
+ this.timelineWriter.write(activeActions, Option.of(action ->
deleteAnyLeftOverMarkers(context, action)), Option.of(exceptionHandler));
LOG.debug("Deleting archived instants");
- deleteArchivedInstants(instantsToArchive, context);
+ deleteArchivedInstants(activeActions, context);
Review Comment:
rename to deleteArchivedActions?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -180,4 +191,165 @@ public static long
convertCompletionTimeToEpoch(HoodieInstant instant) {
return -1;
}
}
+
+ static void rollbackFailedWritesAndCompact(HoodieTable table,
HoodieEngineContext context, HoodieWriteConfig config,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper, boolean shouldCompact) {
+ try {
+ // set required configs for rollback
+
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
+ HoodieWriteConfig rollbackWriteConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+ // set eager cleaning
+
rollbackWriteConfig.setValue(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY.key(),
HoodieFailedWritesCleaningPolicy.EAGER.name());
+ // TODO: Check if we should hardcode disable rollback using markers.
+ // With changes in https://github.com/apache/hudi/pull/12206, we
may not need to do that.
+
rollbackWriteConfig.setValue(HoodieWriteConfig.ROLLBACK_USING_MARKERS_ENABLE.key(),
String.valueOf(config.shouldRollbackUsingMarkers()));
+ // set compaction configs
+ if (shouldCompact) {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
+ } else {
+
rollbackWriteConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"false");
+ }
+ // TODO: Check if we need to disable metadata table.
+ rollbackWriteConfig.setValue(HoodieMetadataConfig.ENABLE.key(),
String.valueOf(config.isMetadataTableEnabled()));
+ // TODO: This util method is used in both upgrade and downgrade.
+ // Check if we need to instantiate write client with correct table
version or is that handled internally.
+ try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(rollbackWriteConfig, context)) {
+ writeClient.rollbackFailedWrites();
+ HoodieTableMetaClient.reload(table.getMetaClient());
+ if (shouldCompact) {
+ Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
+ if (compactionInstantOpt.isPresent()) {
+ writeClient.compact(compactionInstantOpt.get());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ static void upgradeToLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
+ timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_1.equals(timelineLayoutVersion),
+ "Upgrade to LSM timeline is only supported for layout version 1.
Given version: " + timelineLayoutVersion));
+ HoodieArchivedTimeline archivedTimeline =
table.getMetaClient().getArchivedTimeline();
+ if (archivedTimeline.getInstants().isEmpty()) {
+ return;
+ }
+ Consumer<Exception> exceptionHandler = e -> {
+ if (config.isFailOnTimelineArchivingEnabled()) {
+ throw new HoodieException(e);
+ }
+ };
+ try {
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(TimelineLayoutVersion.LAYOUT_VERSION_2, config,
table);
+ archiver.archiveInstants(engineContext, archivedTimeline.getInstants(),
true);
+ } catch (Exception e) {
+ exceptionHandler.accept(e);
+ }
+ }
+
+ static void downgradeFromLSMTimeline(HoodieTable table, HoodieEngineContext
engineContext, HoodieWriteConfig config) {
+ // if timeline layout version is present in the Option then check if it is
LAYOUT_VERSION_2
+
table.getMetaClient().getTableConfig().getTimelineLayoutVersion().ifPresent(
+ timelineLayoutVersion ->
ValidationUtils.checkState(TimelineLayoutVersion.LAYOUT_VERSION_2.equals(timelineLayoutVersion),
+ "Downgrade from LSM timeline is only supported for layout version
2. Given version: " + timelineLayoutVersion));
+ HoodieArchivedTimeline lsmArchivedTimeline =
table.getMetaClient().getArchivedTimeline();
+ if (lsmArchivedTimeline.getInstants().isEmpty()) {
+ return;
+ }
+ Consumer<Exception> exceptionHandler = e -> {
+ if (config.isFailOnTimelineArchivingEnabled()) {
+ throw new HoodieException("Failed to downgrade LSM timeline to old
archived format", e);
+ }
+ };
+ try {
+ HoodieTimelineArchiver archiver =
TimelineArchivers.getInstance(table.getMetaClient().getTimelineLayoutVersion(),
config, table);
+ archiver.archiveInstants(engineContext,
lsmArchivedTimeline.getInstants(), true);
+ } catch (Exception e) {
+ exceptionHandler.accept(e);
+ }
+ }
+
+ static boolean upgradeActiveTimelineInstant(HoodieInstant instant, String
originalFileName, HoodieTableMetaClient metaClient, CommitMetadataSerDeV1
commitMetadataSerDeV1,
+ CommitMetadataSerDeV2
commitMetadataSerDeV2, ActiveTimelineV2 activeTimelineV2) {
+ String replacedFileName = originalFileName;
+ boolean isCompleted = instant.isCompleted();
+ // Rename the metadata file name from the ${instant_time}.action[.state]
format in version 0.x
+ // to the ${instant_time}_${completion_time}.action[.state] format in
version 1.x.
+ if (isCompleted) {
+ String completionTime = instant.getCompletionTime(); // this is the file
modification time
+ String startTime = instant.requestedTime();
+ replacedFileName = replacedFileName.replace(startTime, startTime +
UNDERSCORE + completionTime);
+ }
+ // Rename the action if necessary (e.g., REPLACE_COMMIT_ACTION to
CLUSTERING_ACTION).
+ // NOTE: New action names were only applied for pending instants.
Completed instants do not have any change in action names.
+ if (SIX_TO_EIGHT_TIMELINE_ACTION_MAP.containsKey(instant.getAction()) &&
!isCompleted) {
+ replacedFileName = replacedFileName.replace(instant.getAction(),
SIX_TO_EIGHT_TIMELINE_ACTION_MAP.get(instant.getAction()));
+ }
+ try {
+ return renameTimelineV1InstantFileToV2Format(instant, metaClient,
originalFileName, replacedFileName, commitMetadataSerDeV1,
commitMetadataSerDeV2, activeTimelineV2);
+ } catch (IOException e) {
+ LOG.warn("Can not to complete the upgrade from version seven to version
eight. The reason for failure is {}", e.getMessage());
+ }
+ return false;
+ }
+
+ static boolean downgradeActiveTimelineInstant(HoodieInstant instant, String
originalFileName, HoodieTableMetaClient metaClient, CommitMetadataSerDeV2
commitMetadataSerDeV2,
Review Comment:
Can we move this to EightToSevenDowngradeHandler as this is specific to that
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java:
##########
@@ -79,10 +79,13 @@ public class EightToSevenDowngradeHandler implements
DowngradeHandler {
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
Map<ConfigProperty, String> tablePropsToAdd = new HashMap<>();
- UpgradeDowngradeUtils.runCompaction(table, context, config,
upgradeDowngradeHelper);
- UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
+ // Rollback and run compaction in one step
Review Comment:
Question : Should we reset metadata table when downgrading from 1.x ?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SevenToEightUpgradeHandler.java:
##########
@@ -86,38 +108,56 @@ public Map<ConfigProperty, String>
upgrade(HoodieWriteConfig config, HoodieEngin
ActiveTimelineV2 activeTimelineV2 = new ActiveTimelineV2(metaClient);
context.map(instants, instant -> {
String originalFileName =
instantFileNameGenerator.getFileName(instant);
- String replacedFileName = originalFileName;
- boolean isCompleted = instant.isCompleted();
- // Rename the metadata file name from the
${instant_time}.action[.state] format in version 0.x
- // to the ${instant_time}_${completion_time}.action[.state] format in
version 1.x.
- if (isCompleted) {
- String completionTime = instant.getCompletionTime(); // this is the
file modification time
- String startTime = instant.requestedTime();
- replacedFileName = replacedFileName.replace(startTime, startTime +
UNDERSCORE + completionTime);
- }
- // Rename the action if necessary (e.g., REPLACE_COMMIT_ACTION to
CLUSTERING_ACTION).
- // NOTE: New action names were only applied for pending instants.
Completed instants do not have any change in action names.
- if (SIX_TO_EIGHT_TIMELINE_ACTION_MAP.containsKey(instant.getAction())
&& !isCompleted) {
- replacedFileName = replacedFileName.replace(instant.getAction(),
SIX_TO_EIGHT_TIMELINE_ACTION_MAP.get(instant.getAction()));
- }
- try {
- return renameTimelineV1InstantFileToV2Format(instant, metaClient,
originalFileName, replacedFileName, commitMetadataSerDeV1,
commitMetadataSerDeV2, activeTimelineV2);
- } catch (IOException e) {
- LOG.warn("Can not to complete the upgrade from version seven to
version eight. The reason for failure is {}", e.getMessage());
- }
- return false;
+ return upgradeActiveTimelineInstant(instant, originalFileName,
metaClient, commitMetadataSerDeV1, commitMetadataSerDeV2, activeTimelineV2);
}, instants.size());
}
+ upgradeToLSMTimeline(table, context, config);
return tablePropsToAdd;
}
- private static void upgradePartitionFields(HoodieWriteConfig config,
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+ static void upgradePartitionFields(HoodieWriteConfig config,
HoodieTableConfig tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
String keyGenerator = tableConfig.getKeyGeneratorClassName();
String partitionPathField =
config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key());
if (keyGenerator != null && partitionPathField != null
&& (keyGenerator.equals(KeyGeneratorType.CUSTOM.getClassName()) ||
keyGenerator.equals(KeyGeneratorType.CUSTOM_AVRO.getClassName()))) {
tablePropsToAdd.put(HoodieTableConfig.PARTITION_FIELDS,
partitionPathField);
}
}
+
+ static void setInitialVersion(HoodieWriteConfig config, HoodieTableConfig
tableConfig, Map<ConfigProperty, String> tablePropsToAdd) {
+ tablePropsToAdd.put(HoodieTableConfig.INITIAL_VERSION,
HoodieTableVersion.SIX.name());
Review Comment:
I think we should retain whatever the initial version the table has and only
set if not already set.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]