vinothchandar commented on a change in pull request #4124:
URL: https://github.com/apache/hudi/pull/4124#discussion_r757624429
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -438,33 +424,48 @@ private void
writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRec
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance());
if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
- if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ try {
+ // Lock the upgrade step and the follow-on metadata table creation
+ // and the initial bootstrapping so that concurrent writers if any
+ // are blocked from racing with these one time operations.
this.txnManager.beginTransaction();
- try {
- // Ensure no inflight commits by setting EAGER policy and explicitly
cleaning all failed commits
- this.rollbackFailedWrites(getInstantsToRollback(metaClient,
HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)), true);
- new UpgradeDowngrade(
- metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance())
+ if
(config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()) {
+ // Ensure no in-flight commits by setting EAGER policy and
explicitly cleaning all failed commits
+ this.rollbackFailedWrites(getInstantsToRollback(metaClient,
+ HoodieFailedWritesCleaningPolicy.EAGER, Option.of(instantTime)),
true);
+ new UpgradeDowngrade(metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance())
.run(HoodieTableVersion.current(), instantTime);
- } finally {
- this.txnManager.endTransaction();
+ } else {
+ upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
}
- } else {
- upgradeDowngrade.run(HoodieTableVersion.current(), instantTime);
+ metaClient.reloadActiveTimeline();
+ initializeMetadataTable(Option.of(instantTime));
Review comment:
this already takes care of my suggestion on the other one.
##########
File path:
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
##########
@@ -438,33 +424,48 @@ private void
writeTableMetadataForTableServices(HoodieTable<T, JavaRDD<HoodieRec
UpgradeDowngrade upgradeDowngrade = new UpgradeDowngrade(
metaClient, config, context,
SparkUpgradeDowngradeHelper.getInstance());
if
(upgradeDowngrade.needsUpgradeOrDowngrade(HoodieTableVersion.current())) {
Review comment:
lets move the check also into the lock? otherwise two writers would
think they both need to upgrade and they just wait on the lock is all.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]