This is an automated email from the ASF dual-hosted git repository.

forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c00ea84985 [HUDI-3505] Add call procedure for 
UpgradeOrDowngradeCommand (#6012)
c00ea84985 is described below

commit c00ea8498527d44de127ba34700dfe5000db3652
Author: superche <[email protected]>
AuthorDate: Sun Jul 3 08:47:48 2022 +0800

    [HUDI-3505] Add call procedure for UpgradeOrDowngradeCommand (#6012)
    
    Co-authored-by: superche <[email protected]>
---
 .../hudi/command/procedures/HoodieProcedures.scala |   2 +
 .../procedures/UpgradeOrDowngradeProcedure.scala   | 107 +++++++++++++++++++++
 .../TestUpgradeOrDowngradeProcedure.scala          |  97 +++++++++++++++++++
 3 files changed, 206 insertions(+)

diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index d066ae5bcd..b8893ac80d 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -64,6 +64,8 @@ object HoodieProcedures {
     mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
     mapBuilder.put(ShowBootstrapMappingProcedure.NAME, 
ShowBootstrapMappingProcedure.builder)
     mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME, 
ShowBootstrapPartitionsProcedure.builder)
+    mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
+    mapBuilder.put(DowngradeTableProcedure.NAME, 
DowngradeTableProcedure.builder)
     mapBuilder.build
   }
 }
diff --git 
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
new file mode 100644
index 0000000000..792d26b184
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig, 
HoodieWriteConfig}
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper, 
UpgradeDowngrade}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField, 
StructType}
+
+import java.util.function.Supplier
+import scala.util.{Failure, Success, Try}
+
+class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder 
with Logging {
+  private val PARAMETERS = Array[ProcedureParameter](
+    ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+    ProcedureParameter.required(1, "toVersion", DataTypes.StringType, None)
+  )
+
+  private val OUTPUT_TYPE = new StructType(Array[StructField](
+    StructField("result", DataTypes.BooleanType, nullable = true, 
Metadata.empty))
+  )
+
+  def parameters: Array[ProcedureParameter] = PARAMETERS
+
+  def outputType: StructType = OUTPUT_TYPE
+
+  override def call(args: ProcedureArgs): Seq[Row] = {
+    super.checkArgs(PARAMETERS, args)
+
+    val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+    val toVersion = getArgValueOrDefault(args, 
PARAMETERS(1)).get.asInstanceOf[String]
+    val basePath = getBasePath(tableName)
+
+    val config = getWriteConfigWithTrue(basePath)
+    val metaClient = HoodieTableMetaClient.builder
+      .setConf(jsc.hadoopConfiguration)
+      .setBasePath(config.getBasePath)
+      .setLoadActiveTimelineOnLoad(false)
+      .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
+      .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion)))
+      .setFileSystemRetryConfig(config.getFileSystemRetryConfig)
+      .build
+
+    val result = Try {
+      new UpgradeDowngrade(metaClient, config, new 
HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance)
+        .run(HoodieTableVersion.valueOf(toVersion), null)
+    } match {
+      case Success(_) =>
+        logInfo(s"Table at $basePath upgraded / downgraded to version 
$toVersion.")
+        true
+      case Failure(e) =>
+        logWarning(s"Failed: Could not upgrade/downgrade table at $basePath to 
version $toVersion.", e)
+        false
+    }
+
+    Seq(Row(result))
+  }
+
+  private def getWriteConfigWithTrue(basePath: String) = {
+    HoodieWriteConfig.newBuilder
+      .withPath(basePath)
+      .withRollbackUsingMarkers(true)
+      
.withCompactionConfig(HoodieCompactionConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
+      
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
+      .build
+  }
+
+  override def build = new UpgradeOrDowngradeProcedure()
+}
+
+object UpgradeTableProcedure {
+  val NAME = "upgrade_table"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new UpgradeOrDowngradeProcedure()
+  }
+}
+
+object DowngradeTableProcedure {
+  val NAME = "downgrade_table"
+
+  def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+    override def get() = new UpgradeOrDowngradeProcedure()
+  }
+}
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
new file mode 100644
index 0000000000..55c184ab56
--- /dev/null
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient, 
HoodieTableVersion}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+import java.io.IOException
+
+class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
+
+  test("Test Call downgrade_table and upgrade_table Procedure") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | tblproperties (
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+      // Check required fields
+      checkExceptionContain(s"""call downgrade_table(table => 
'$tableName')""")(
+        s"Argument: toVersion is required")
+
+      var metaClient = HoodieTableMetaClient.builder
+        .setConf(new 
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+        .setBasePath(tablePath)
+        .build
+
+      // verify hoodie.table.version of the original table
+      assertResult(HoodieTableVersion.FOUR.versionCode) {
+        metaClient.getTableConfig.getTableVersion.versionCode()
+      }
+      assertTableVersionFromPropertyFile(metaClient, 
HoodieTableVersion.FOUR.versionCode)
+
+      // downgrade table to ZERO
+      checkAnswer(s"""call downgrade_table(table => '$tableName', toVersion => 
'ZERO')""")(Seq(true))
+
+      // verify the downgraded hoodie.table.version
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      assertResult(HoodieTableVersion.ZERO.versionCode) {
+        metaClient.getTableConfig.getTableVersion.versionCode()
+      }
+      assertTableVersionFromPropertyFile(metaClient, 
HoodieTableVersion.ZERO.versionCode)
+
+      // upgrade table to ONE
+      checkAnswer(s"""call upgrade_table(table => '$tableName', toVersion => 
'ONE')""")(Seq(true))
+
+      // verify the upgraded hoodie.table.version
+      metaClient = HoodieTableMetaClient.reload(metaClient)
+      assertResult(HoodieTableVersion.ONE.versionCode) {
+        metaClient.getTableConfig.getTableVersion.versionCode()
+      }
+      assertTableVersionFromPropertyFile(metaClient, 
HoodieTableVersion.ONE.versionCode)
+    }
+  }
+
+  @throws[IOException]
+  private def assertTableVersionFromPropertyFile(metaClient: 
HoodieTableMetaClient, versionCode: Int): Unit = {
+    val propertyFile = new Path(metaClient.getMetaPath + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)
+    // Load the properties and verify
+    val fsDataInputStream = metaClient.getFs.open(propertyFile)
+    val hoodieConfig = HoodieConfig.create(fsDataInputStream)
+    fsDataInputStream.close()
+    assertResult(Integer.toString(versionCode)) {
+      hoodieConfig.getString(HoodieTableConfig.VERSION)
+    }
+  }
+}

Reply via email to