This is an automated email from the ASF dual-hosted git repository.
forwardxu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c00ea84985 [HUDI-3505] Add call procedure for
UpgradeOrDowngradeCommand (#6012)
c00ea84985 is described below
commit c00ea8498527d44de127ba34700dfe5000db3652
Author: superche <[email protected]>
AuthorDate: Sun Jul 3 08:47:48 2022 +0800
[HUDI-3505] Add call procedure for UpgradeOrDowngradeCommand (#6012)
Co-authored-by: superche <[email protected]>
---
.../hudi/command/procedures/HoodieProcedures.scala | 2 +
.../procedures/UpgradeOrDowngradeProcedure.scala | 107 +++++++++++++++++++++
.../TestUpgradeOrDowngradeProcedure.scala | 97 +++++++++++++++++++
3 files changed, 206 insertions(+)
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
index d066ae5bcd..b8893ac80d 100644
---
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/HoodieProcedures.scala
@@ -64,6 +64,8 @@ object HoodieProcedures {
mapBuilder.put(RunBootstrapProcedure.NAME, RunBootstrapProcedure.builder)
mapBuilder.put(ShowBootstrapMappingProcedure.NAME,
ShowBootstrapMappingProcedure.builder)
mapBuilder.put(ShowBootstrapPartitionsProcedure.NAME,
ShowBootstrapPartitionsProcedure.builder)
+ mapBuilder.put(UpgradeTableProcedure.NAME, UpgradeTableProcedure.builder)
+ mapBuilder.put(DowngradeTableProcedure.NAME,
DowngradeTableProcedure.builder)
mapBuilder.build
}
}
diff --git
a/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
new file mode 100644
index 0000000000..792d26b184
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/procedures/UpgradeOrDowngradeProcedure.scala
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.command.procedures
+
+import org.apache.hudi.client.common.HoodieSparkEngineContext
+import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy
+import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion
+import org.apache.hudi.common.table.{HoodieTableMetaClient, HoodieTableVersion}
+import org.apache.hudi.common.util.Option
+import org.apache.hudi.config.{HoodieCompactionConfig, HoodieIndexConfig,
HoodieWriteConfig}
+import org.apache.hudi.index.HoodieIndex
+import org.apache.hudi.table.upgrade.{SparkUpgradeDowngradeHelper,
UpgradeDowngrade}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.types.{DataTypes, Metadata, StructField,
StructType}
+
+import java.util.function.Supplier
+import scala.util.{Failure, Success, Try}
+
+class UpgradeOrDowngradeProcedure extends BaseProcedure with ProcedureBuilder
with Logging {
+ private val PARAMETERS = Array[ProcedureParameter](
+ ProcedureParameter.required(0, "table", DataTypes.StringType, None),
+ ProcedureParameter.required(1, "toVersion", DataTypes.StringType, None)
+ )
+
+ private val OUTPUT_TYPE = new StructType(Array[StructField](
+ StructField("result", DataTypes.BooleanType, nullable = true,
Metadata.empty))
+ )
+
+ def parameters: Array[ProcedureParameter] = PARAMETERS
+
+ def outputType: StructType = OUTPUT_TYPE
+
+ override def call(args: ProcedureArgs): Seq[Row] = {
+ super.checkArgs(PARAMETERS, args)
+
+ val tableName = getArgValueOrDefault(args, PARAMETERS(0))
+ val toVersion = getArgValueOrDefault(args,
PARAMETERS(1)).get.asInstanceOf[String]
+ val basePath = getBasePath(tableName)
+
+ val config = getWriteConfigWithTrue(basePath)
+ val metaClient = HoodieTableMetaClient.builder
+ .setConf(jsc.hadoopConfiguration)
+ .setBasePath(config.getBasePath)
+ .setLoadActiveTimelineOnLoad(false)
+ .setConsistencyGuardConfig(config.getConsistencyGuardConfig)
+ .setLayoutVersion(Option.of(new
TimelineLayoutVersion(config.getTimelineLayoutVersion)))
+ .setFileSystemRetryConfig(config.getFileSystemRetryConfig)
+ .build
+
+ val result = Try {
+ new UpgradeDowngrade(metaClient, config, new
HoodieSparkEngineContext(jsc), SparkUpgradeDowngradeHelper.getInstance)
+ .run(HoodieTableVersion.valueOf(toVersion), null)
+ } match {
+ case Success(_) =>
+ logInfo(s"Table at $basePath upgraded / downgraded to version
$toVersion.")
+ true
+ case Failure(e) =>
+ logWarning(s"Failed: Could not upgrade/downgrade table at $basePath to
version $toVersion.", e)
+ false
+ }
+
+ Seq(Row(result))
+ }
+
+ private def getWriteConfigWithTrue(basePath: String) = {
+ HoodieWriteConfig.newBuilder
+ .withPath(basePath)
+ .withRollbackUsingMarkers(true)
+
.withCompactionConfig(HoodieCompactionConfig.newBuilder.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.EAGER).build)
+
.withIndexConfig(HoodieIndexConfig.newBuilder.withIndexType(HoodieIndex.IndexType.BLOOM).build)
+ .build
+ }
+
+ override def build = new UpgradeOrDowngradeProcedure()
+}
+
+object UpgradeTableProcedure {
+ val NAME = "upgrade_table"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new UpgradeOrDowngradeProcedure()
+ }
+}
+
+object DowngradeTableProcedure {
+ val NAME = "downgrade_table"
+
+ def builder: Supplier[ProcedureBuilder] = new Supplier[ProcedureBuilder] {
+ override def get() = new UpgradeOrDowngradeProcedure()
+ }
+}
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
new file mode 100644
index 0000000000..55c184ab56
--- /dev/null
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hudi.procedure
+
+import org.apache.hadoop.fs.Path
+import org.apache.hudi.common.config.HoodieConfig
+import org.apache.hudi.common.table.{HoodieTableConfig, HoodieTableMetaClient,
HoodieTableVersion}
+import org.apache.spark.api.java.JavaSparkContext
+import org.apache.spark.sql.hudi.HoodieSparkSqlTestBase
+
+import java.io.IOException
+
+class TestUpgradeOrDowngradeProcedure extends HoodieSparkSqlTestBase {
+
+ test("Test Call downgrade_table and upgrade_table Procedure") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | tblproperties (
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+ // Check required fields
+ checkExceptionContain(s"""call downgrade_table(table =>
'$tableName')""")(
+ s"Argument: toVersion is required")
+
+ var metaClient = HoodieTableMetaClient.builder
+ .setConf(new
JavaSparkContext(spark.sparkContext).hadoopConfiguration())
+ .setBasePath(tablePath)
+ .build
+
+ // verify hoodie.table.version of the original table
+ assertResult(HoodieTableVersion.FOUR.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ assertTableVersionFromPropertyFile(metaClient,
HoodieTableVersion.FOUR.versionCode)
+
+ // downgrade table to ZERO
+ checkAnswer(s"""call downgrade_table(table => '$tableName', toVersion =>
'ZERO')""")(Seq(true))
+
+ // verify the downgraded hoodie.table.version
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertResult(HoodieTableVersion.ZERO.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ assertTableVersionFromPropertyFile(metaClient,
HoodieTableVersion.ZERO.versionCode)
+
+ // upgrade table to ONE
+ checkAnswer(s"""call upgrade_table(table => '$tableName', toVersion =>
'ONE')""")(Seq(true))
+
+ // verify the upgraded hoodie.table.version
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertResult(HoodieTableVersion.ONE.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ assertTableVersionFromPropertyFile(metaClient,
HoodieTableVersion.ONE.versionCode)
+ }
+ }
+
+ @throws[IOException]
+ private def assertTableVersionFromPropertyFile(metaClient:
HoodieTableMetaClient, versionCode: Int): Unit = {
+ val propertyFile = new Path(metaClient.getMetaPath + "/" +
HoodieTableConfig.HOODIE_PROPERTIES_FILE)
+ // Load the properties and verify
+ val fsDataInputStream = metaClient.getFs.open(propertyFile)
+ val hoodieConfig = HoodieConfig.create(fsDataInputStream)
+ fsDataInputStream.close()
+ assertResult(Integer.toString(versionCode)) {
+ hoodieConfig.getString(HoodieTableConfig.VERSION)
+ }
+ }
+}