This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new e97aedc0ecb [HUDI-7859] Rename instant files to be consistent with 0.x
naming format when downgrade table (#11545)
e97aedc0ecb is described below
commit e97aedc0ecb4dd079c3018c42f8b394d110a2ddc
Author: watermelon12138 <[email protected]>
AuthorDate: Wed Jul 17 15:40:03 2024 +0800
[HUDI-7859] Rename instant files to be consistent with 0.x naming format
when downgrade table (#11545)
Co-authored-by: watermelon12138 <[email protected]>
---
.../upgrade/EightToSevenDowngradeHandler.java | 41 +++++++++++++++
.../table/upgrade/SixToFiveDowngradeHandler.java | 59 +--------------------
...radeHandler.java => UpgradeDowngradeUtils.java} | 60 ++++++----------------
.../hudi/common/table/timeline/HoodieInstant.java | 2 +-
.../sql/hudi/common/HoodieSparkSqlTestBase.scala | 4 ++
.../TestUpgradeOrDowngradeProcedure.scala | 53 +++++++++++++++++++
6 files changed, 116 insertions(+), 103 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 3bb22481681..60d387ef439 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -20,18 +20,59 @@ package org.apache.hudi.table.upgrade;
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+import static org.apache.hudi.common.table.timeline.HoodieInstant.UNDERSCORE;
+
+
/**
* Version 7 is going to be placeholder version for bridge release 0.16.0.
* Version 8 is the placeholder version to track 1.x.
*/
public class EightToSevenDowngradeHandler implements DowngradeHandler {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(EightToSevenDowngradeHandler.class);
+
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ UpgradeDowngradeUtils.runCompaction(table, context, config,
upgradeDowngradeHelper);
+ UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
+
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
+ List<HoodieInstant> instants =
metaClient.getActiveTimeline().getInstants();
+ if (!instants.isEmpty()) {
+ context.map(instants, instant -> {
+ if (instant.getFileName().contains(UNDERSCORE)) {
+ try {
+ // Rename the metadata file name from the
${instant_time}_${completion_time}.action[.state] format in version 1.x to the
${instant_time}.action[.state] format in version 0.x.
+ StoragePath fromPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName());
+ StoragePath toPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName().replaceAll(UNDERSCORE + "\\d+", ""));
+ boolean success = metaClient.getStorage().rename(fromPath, toPath);
+ // TODO: We need to rename the action-related part of the metadata
file name here when we bring separate action name for clustering/compaction in
1.x as well.
+ if (!success) {
+ throw new HoodieIOException("an error that occurred while
renaming " + fromPath + " to: " + toPath);
+ }
+ return true;
+ } catch (IOException e) {
+ LOG.warn("Can not to complete the downgrade from version eight to
version seven. The reason for failure is {}", e.getMessage());
+ }
+ }
+ return false;
+ }, instants.size());
+ }
return Collections.emptyMap();
}
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index 68938e895b0..f4199fff88c 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -18,27 +18,14 @@
package org.apache.hudi.table.upgrade;
-import org.apache.hudi.client.BaseHoodieWriteClient;
import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.FileIOUtils;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
-import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import java.util.HashMap;
import java.util.Map;
@@ -64,9 +51,8 @@ public class SixToFiveDowngradeHandler implements
DowngradeHandler {
// Since version 6 includes a new schema field for metadata table(MDT),
the MDT needs to be deleted during downgrade to avoid column drop error.
HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
// The log block version has been upgraded in version six so compaction is
required for downgrade.
- runCompaction(table, context, config, upgradeDowngradeHelper);
-
- syncCompactionRequestedFileToAuxiliaryFolder(table);
+ UpgradeDowngradeUtils.runCompaction(table, context, config,
upgradeDowngradeHelper);
+ UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
HoodieTableMetaClient metaClient =
HoodieTableMetaClient.reload(table.getMetaClient());
Map<ConfigProperty, String> updatedTableProps = new HashMap<>();
@@ -78,45 +64,4 @@ public class SixToFiveDowngradeHandler implements
DowngradeHandler {
return updatedTableProps;
}
- /**
- * Utility method to run compaction for MOR table as part of downgrade step.
- */
- private void runCompaction(HoodieTable table, HoodieEngineContext context,
HoodieWriteConfig config,
- SupportsUpgradeDowngrade upgradeDowngradeHelper) {
- try {
- if (table.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
- // set required configs for scheduling compaction.
-
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
- HoodieWriteConfig compactionConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
- compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
-
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
-
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
-
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
- compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
- try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
- Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
- if (compactionInstantOpt.isPresent()) {
- writeClient.compact(compactionInstantOpt.get());
- }
- }
- }
- } catch (Exception e) {
- throw new HoodieException(e);
- }
- }
-
- /**
- * See HUDI-6040.
- */
- private static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable
table) {
- HoodieTableMetaClient metaClient = table.getMetaClient();
- HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient,
false).filterPendingCompactionTimeline()
- .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
- compactionTimeline.getInstantsAsStream().forEach(instant -> {
- String fileName = instant.getFileName();
- FileIOUtils.copy(metaClient.getStorage(),
- new StoragePath(metaClient.getMetaPath(), fileName),
- new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
- });
- }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
similarity index 57%
copy from
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
copy to
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index 68938e895b0..b6e4db6e8e6 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -19,11 +19,9 @@
package org.apache.hudi.table.upgrade;
import org.apache.hudi.client.BaseHoodieWriteClient;
-import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.config.HoodieMetadataConfig;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -34,55 +32,21 @@ import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.storage.StoragePath;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
-import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
-import static
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
-
-/**
- * Downgrade handle to assist in downgrading hoodie table from version 6 to 5.
- * To ensure compatibility, we need recreate the compaction requested file to
- * .aux folder.
- * Since version 6 includes a new schema field for metadata table(MDT),
- * the MDT needs to be deleted during downgrade to avoid column drop error.
- * Also log block version was upgraded in version 6, therefore full compaction
needs
- * to be completed during downgrade to avoid both read and future compaction
failures.
- */
-public class SixToFiveDowngradeHandler implements DowngradeHandler {
-
- @Override
- public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
- final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
-
- // Since version 6 includes a new schema field for metadata table(MDT),
the MDT needs to be deleted during downgrade to avoid column drop error.
- HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
- // The log block version has been upgraded in version six so compaction is
required for downgrade.
- runCompaction(table, context, config, upgradeDowngradeHelper);
-
- syncCompactionRequestedFileToAuxiliaryFolder(table);
-
- HoodieTableMetaClient metaClient =
HoodieTableMetaClient.reload(table.getMetaClient());
- Map<ConfigProperty, String> updatedTableProps = new HashMap<>();
- HoodieTableConfig tableConfig = metaClient.getTableConfig();
- Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS))
- .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v));
-
Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT))
- .ifPresent(v ->
updatedTableProps.put(TABLE_METADATA_PARTITIONS_INFLIGHT, v));
- return updatedTableProps;
- }
+public class UpgradeDowngradeUtils {
/**
* Utility method to run compaction for MOR table as part of downgrade step.
*/
- private void runCompaction(HoodieTable table, HoodieEngineContext context,
HoodieWriteConfig config,
- SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+ public static void runCompaction(HoodieTable table, HoodieEngineContext
context, HoodieWriteConfig config,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
try {
if (table.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
// set required configs for scheduling compaction.
@@ -108,15 +72,21 @@ public class SixToFiveDowngradeHandler implements
DowngradeHandler {
/**
* See HUDI-6040.
*/
- private static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable
table) {
+ public static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable
table) {
HoodieTableMetaClient metaClient = table.getMetaClient();
HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient,
false).filterPendingCompactionTimeline()
.filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
compactionTimeline.getInstantsAsStream().forEach(instant -> {
String fileName = instant.getFileName();
- FileIOUtils.copy(metaClient.getStorage(),
- new StoragePath(metaClient.getMetaPath(), fileName),
- new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
+ try {
+ if (!metaClient.getStorage().exists(new
StoragePath(metaClient.getMetaAuxiliaryPath(), fileName))) {
+ FileIOUtils.copy(metaClient.getStorage(),
+ new StoragePath(metaClient.getMetaPath(), fileName),
+ new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
});
}
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 05d34241ec1..c8f1bd615c8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -44,7 +44,7 @@ public class HoodieInstant implements Serializable,
Comparable<HoodieInstant> {
private static final String DELIMITER = ".";
- private static final String UNDERSCORE = "_";
+ public static final String UNDERSCORE = "_";
private static final String FILE_NAME_FORMAT_ERROR =
"The provided file name %s does not conform to the required format";
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 794e7a322b3..432d2ab767e 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory
import java.io.File
import java.util.TimeZone
+import java.util.regex.Pattern
class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
@@ -249,6 +250,9 @@ class HoodieSparkSqlTestBase extends FunSuite with
BeforeAndAfterAll {
object HoodieSparkSqlTestBase {
+ // the naming format of 0.x version
+ final val NAME_FORMAT_0_X: Pattern =
Pattern.compile("^(\\d+)(\\.\\w+)(\\.\\D+)?$")
+
def getLastCommitMetadata(spark: SparkSession, tablePath: String) = {
val metaClient = createMetaClient(spark, tablePath)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index bf7b509b970..fed1ea7c2c8 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -22,9 +22,12 @@ import org.apache.hudi.common.table.{HoodieTableConfig,
HoodieTableMetaClient, H
import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils}
import org.apache.hudi.storage.StoragePath
import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.NAME_FORMAT_0_X
import java.io.IOException
import java.time.Instant
+import scala.collection.JavaConverters._
+
class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {
@@ -142,6 +145,56 @@ class TestUpgradeOrDowngradeProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test downgrade table from version eight to version seven") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+ spark.sql("set hoodie.clean.commits.retained = 2")
+ spark.sql("set hoodie.keep.min.commits = 3")
+ spark.sql("set hoodie.keep.min.commits = 4")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+ var metaClient = createMetaClient(spark, tablePath)
+ // verify hoodie.table.version of the table is EIGHT
+ if
(metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode()))
{
+ // downgrade table from version eight to version seven
+ checkAnswer(s"""call downgrade_table(table => '$tableName', to_version
=> 'SEVEN')""")(Seq(true))
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertResult(HoodieTableVersion.SEVEN.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ // Verify whether the naming format of instant files is consistent
with 0.x
+
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f =>
NAME_FORMAT_0_X.matcher(f.getFileName).find())
+ checkAnswer(s"select id, name, price, ts from $tableName")(
+ Seq(1, "a1", 10.0, 1000)
+ )
+ }
+ }
+ }
+
@throws[IOException]
private def assertTableVersionFromPropertyFile(metaClient:
HoodieTableMetaClient, versionCode: Int): Unit = {
val propertyFile = new StoragePath(metaClient.getMetaPath + "/" +
HoodieTableConfig.HOODIE_PROPERTIES_FILE)