manojpec commented on a change in pull request #4124:
URL: https://github.com/apache/hudi/pull/4124#discussion_r757658750
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -438,33 +424,48 @@ private void
writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRec
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance());
if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
Review comment:
fixed.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -438,33 +424,48 @@ private void
writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRec
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance());
if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
- if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ try {
+ // Lock the upgrade step and the follow-on metadata table creation
+ // and the initial bootstrapping so that concurrent writers if any
+ // are blocked from racing with these one time operations.
this.txnManager.beginTransaction();
- try {
- // Ensure no inflight commits by setting EAGER policy and explicitly
cleaning all failed commits
- this.rollbackFailedWrites(getInstantsToRollback(metaClient,
HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true);
- new UpgradeDowngrade(
- metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance())
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ // Ensure no in-flight commits by setting EAGER policy and
explicitly cleaning all failed commits
+ this.rollbackFailedWrites(getInstantsToRollback(metaClient,
+ HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)),
true);
+ new UpgradeDowngrade(metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
- } finally {
- this.txnManager.endTransaction();
+ } else {
+ upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
}
- } else {
- upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
+ metaClient.reloadActiveTimeline();
+ initializeMetadataTable(Option.of(instantTime));
Review comment:
right.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]