This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new cda9dbca206 [HUDI-7129] Fix bug when upgrade from table version three
using UpgradeOrDowngradeProcedure (#10147)
cda9dbca206 is described below
commit cda9dbca20698749287f9efc15c26a87dd484816
Author: Jing Zhang <[email protected]>
AuthorDate: Wed Nov 22 18:04:39 2023 +0800
[HUDI-7129] Fix bug when upgrade from table version three using
UpgradeOrDowngradeProcedure (#10147)
---
.../table/upgrade/ThreeToFourUpgradeHandler.java | 6 +++++
.../procedures/UpgradeOrDowngradeProcedure.scala | 15 ++++++++----
.../TestUpgradeOrDowngradeProcedure.scala | 27 ++++++++++++++++++++++
3 files changed, 43 insertions(+), 5 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
index 4da675ea820..c7cb544aec9 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/ThreeToFourUpgradeHandler.java
@@ -22,12 +22,14 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.metadata.MetadataPartitionType;
import java.util.Hashtable;
import java.util.Map;
+import static org.apache.hudi.common.table.HoodieTableConfig.DATABASE_NAME;
import static org.apache.hudi.common.table.HoodieTableConfig.TABLE_CHECKSUM;
import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
import static
org.apache.hudi.metadata.HoodieTableMetadataUtil.metadataPartitionExists;
@@ -40,6 +42,10 @@ public class ThreeToFourUpgradeHandler implements
UpgradeHandler {
@Override
public Map<ConfigProperty, String> upgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
Map<ConfigProperty, String> tablePropsToAdd = new Hashtable<>();
+ String database = config.getString(DATABASE_NAME);
+ if (StringUtils.nonEmpty(database)) {
+ tablePropsToAdd.put(DATABASE_NAME, database);
+ }
tablePropsToAdd.put(TABLE_CHECKSUM,
String.valueOf(HoodieTableConfig.generateChecksum(config.getProps())));
// if metadata is enabled and files partition exist then update
TABLE_METADATA_INDEX_COMPLETED
// schema for the files partition is same between the two versions
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
index 0ae413040bc..b94f0966575 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -20,16 +20,18 @@ package org.apache.spark.sql.hudi.command.procedures
import org.apache.hudi.client.common.HoodieSparkEngineContext
import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
-import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
import org.apache.hudi.common.util.Option
import org.apache.hudi.config.{HoodieIndexConfig, HoodieWriteConfig,
HoodieCleanConfig}
import org.apache.hudi.index.HoodieIndex
import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
+import org.apache.hudi.HoodieCLIUtils
import org.apache.spark.internal.Logging
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
import java.util.function.Supplier
+import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
@@ -51,9 +53,8 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure with
ProcedureBuilder wi
val tableName = getArgValueOrDefault(args, PARAMETERS(0))
val toVersion = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[String]
- val basePath = getBasePath(tableName)
-
- val config = getWriteConfigWithTrue(basePath)
+ val config = getWriteConfigWithTrue(tableName)
+ val basePath = config.getBasePath
val metaClient = HoodieTableMetaClient.builder
.setConf(jsc.hadoopConfiguration)
.setBasePath(config.getBasePath)
@@ -78,12 +79,16 @@ class UpgradeOrDowngradeProcedure extends BaseProcedure
with ProcedureBuilder wi
Seq(Row(result))
}
- private def getWriteConfigWithTrue(basePath: String) = {
+ private def getWriteConfigWithTrue(tableOpt: scala.Option[Any]) = {
+ val basePath = getBasePath(tableOpt)
+ val (tableName, database) =
HoodieCLIUtils.getTableIdentifier(tableOpt.get.asInstanceOf[String])
HoodieWriteConfig.newBuilder
+ .forTable(tableName)
.withPath(basePath)
.withRollbackUsingMarkers(true)
.withCleanConfig(HoodieCleanConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
+ .withProps(Map(HoodieTableConfig.DATABASE_NAME.key ->
database.getOrElse(sparkSession.sessionState.catalog.getCurrentDatabase)).asJava)
.build
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index ff4b5aa92ea..1bd29cabc40 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -82,6 +82,33 @@ class TestUpgradeOrDowngradeProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test Call upgrade_table from version three") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ // downgrade table to THREE
+ checkAnswer(s"""call downgrade_table(table => '$tableName', to_version
=> 'THREE')""")(Seq(true))
+ // upgrade table to FOUR
+ checkAnswer(s"""call upgrade_table(table => '$tableName', to_version =>
'FOUR')""")(Seq(true))
+ }
+ }
+
@throws[IOException]
private def assertTableVersionFromPropertyFile(metaClient:
HoodieTableMetaClient, versionCode: Int): Unit = {
val propertyFile = new Path(metaClient.getMetaPath + "/" +
HoodieTableConfig.HOODIE_PROPERTIES_FILE)