This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new cda9dbca206 [HUDI-7129] Fix bug when upgrade from table version three 
using UpgradeOrDowngradeProcedure (#10147)
cda9dbca206 is described below

commit cda9dbca20698749287f9efc15c26a87dd484816
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Nov 22 18:04:39 2023 +0800

    [HUDI-7129] Fix bug when upgrade from table version three using 
UpgradeOrDowngradeProcedure (#10147)
---
 .../table/upgrade/ThreeToFourUpgradeHandler.java   |  6 +++++
 .../procedures/UpgradeOrDowngradeProcedure.scala   | 15 ++++++++----
 .../TestUpgradeOrDowngradeProcedure.scala          | 27 ++++++++++++++++++++++
 3 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
index 4da675ea820..c7cb544aec9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
@@ -22,12 +22,14 @@ package org.apache.hudi.table.upgrade;
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.StringUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.metadata.MetadataPartitionType;
 
 import java.util.Hashtable;
 import java.util.Map;
 
+import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
 import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
 import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
 import static 
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
@@ -40,6 +42,10 @@ public class ThreeToFourUpgradeHandler implements 
UpgradeHandler {
   @Override
   public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
     Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
+    String database = config.getString(DATABASE_NAME);
+    if (StringUtils.nonEmpty(database)) {
+      tablePropsToAdd.put(DATABASE_NAME, database);
+    }
     tablePropsToAdd.put(TABLE_CHECKSUM, 
String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
     // if metadata is enabled and files partition exist then update 
TABLE_METADATA_INDEX_COMPLETED
     // schema for the files partition is same between the two versions
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 0ae413040bc..b94f0966575 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -20,16 +20,18 @@ package org.apache.spark.sql.hudi.command.procedures
 import org.apache.hudi.client.common.HoodieSparkEngineContext
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
 import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
-import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
 import org.apache.hudi.common.util.Option
 import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig, 
HoodieCleanConfig}
 import org.apache.hudi.index.HoodieIndex
 import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+import org.apache.hudi.HoodieCLIUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
 
 import java.util.function.Supplier
+import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
 class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
@@ -51,9 +53,8 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with 
ProcedureBuilder wi
 
     val tableName = getArgValueOrDefault(args, PARAMETERS(0))
     val toVersion = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
-    val basePath = getBasePath(tableName)
-
-    val config = getWriteConfigWithTrue(basePath)
+    val config = getWriteConfigWithTrue(tableName)
+    val basePath = config.getBasePath
     val metaClient = HoodieTableMetaClient.builder
       .setConf(jsc.hadoopConfiguration)
       .setBasePath(config.getBasePath)
@@ -78,12 +79,16 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure 
with ProcedureBuilder wi
     Seq(Row(result))
   }
 
-  private def getWriteConfigWithTrue(basePath: String) = {
+  private def getWriteConfigWithTrue(tableOpt: scala.Option[Any]) = {
+    val basePath = getBasePath(tableOpt)
+    val (tableName, database) = 
HoodieCLIUtils.getTableIdentifier(tableOpt.get.asInstanceOf[String])
     HoodieWriteConfig.newBuilder
+      .forTable(tableName)
       .withPath(basePath)
       .withRollbackUsingMarkers(true)
       
.withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
       
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
+      .withProps(Map(HoodieTableConfig.DATABASE_NAME.key -> 
database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)).asJava)
       .build
   }
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index ff4b5aa92ea..1bd29cabc40 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -82,6 +82,33 @@ class TestUpgradeOrDowngradeProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test Call upgrade_table from version three") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      // downgrade table to THREE
+      checkAnswer(s"""call downgrade_table(table => '$tableName', to_version 
=> 'THREE')""")(Seq(true))
+      // upgrade table to FOUR
+      checkAnswer(s"""call upgrade_table(table => '$tableName', to_version => 
'FOUR')""")(Seq(true))
+    }
+  }
+
   @throws[IOException]
   private def assertTableVersionFromPropertyFile(metaClient: 
HoodieTableMetaClient, versionCode: Int): Unit = {
     val propertyFile = new Path(metaClient.getMetaPath + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)

Reply via email to