This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new e97aedc0ecb [HUDI-7859] Rename instant files to be consistent with 0.x 
naming format when downgrade table (#11545)
e97aedc0ecb is described below

commit e97aedc0ecb4dd079c3018c42f8b394d110a2ddc
Author: watermelon12138 <[email protected]>
AuthorDate: Wed Jul 17 15:40:03 2024 +0800

    [HUDI-7859] Rename instant files to be consistent with 0.x naming format 
when downgrade table (#11545)
    
    Co-authored-by: watermelon12138 <[email protected]>
---
 .../upgrade/EightToSevenDowngradeHandler.java      | 41 +++++++++++++++
 .../table/upgrade/SixToFiveDowngradeHandler.java   | 59 +--------------------
 ...radeHandler.java => UpgradeDowngradeUtils.java} | 60 ++++++----------------
 .../hudi/common/table/timeline/HoodieInstant.java  |  2 +-
 .../sql/hudi/common/HoodieSparkSqlTestBase.scala   |  4 ++
 .../TestUpgradeOrDowngradeProcedure.scala          | 53 +++++++++++++++++++
 6 files changed, 116 insertions(+), 103 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
index 3bb22481681..60d387ef439 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java
@@ -20,18 +20,59 @@ package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Map;
 
+import static org.apache.hudi.common.table.timeline.HoodieInstant.UNDERSCORE;
+
+
 /**
  * Version 7 is going to be placeholder version for bridge release 0.16.0.
  * Version 8 is the placeholder version to track 1.x.
  */
 public class EightToSevenDowngradeHandler implements DowngradeHandler {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(EightToSevenDowngradeHandler.class);
+
   @Override
   public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
+    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+    UpgradeDowngradeUtils.runCompaction(table, context, config, 
upgradeDowngradeHelper);
+    UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
+
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
+    List<HoodieInstant> instants = 
metaClient.getActiveTimeline().getInstants();
+    if (!instants.isEmpty()) {
+      context.map(instants, instant -> {
+        if (instant.getFileName().contains(UNDERSCORE)) {
+          try {
+            // Rename the metadata file name from the 
${instant_time}_${completion_time}.action[.state] format in version 1.x to the 
${instant_time}.action[.state] format in version 0.x.
+            StoragePath fromPath = new StoragePath(metaClient.getMetaPath(), 
instant.getFileName());
+            StoragePath toPath = new StoragePath(metaClient.getMetaPath(), 
instant.getFileName().replaceAll(UNDERSCORE + "\\d+", ""));
+            boolean success = metaClient.getStorage().rename(fromPath, toPath);
+            // TODO: We need to rename the action-related part of the metadata 
file name here when we bring separate action name for clustering/compaction in 
1.x as well.
+            if (!success) {
+              throw new HoodieIOException("an error that occurred while 
renaming " + fromPath + " to: " + toPath);
+            }
+            return true;
+          } catch (IOException e) {
+            LOG.warn("Can not to complete the downgrade from version eight to 
version seven. The reason for failure is {}", e.getMessage());
+          }
+        }
+        return false;
+      }, instants.size());
+    }
     return Collections.emptyMap();
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
index 68938e895b0..f4199fff88c 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
@@ -18,27 +18,14 @@
 
 package org.apache.hudi.table.upgrade;
 
-import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.common.config.ConfigProperty;
-import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.util.FileIOUtils;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadataUtil;
-import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
-import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
-import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -64,9 +51,8 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
     // Since version 6 includes a new schema field for metadata table(MDT), 
the MDT needs to be deleted during downgrade to avoid column drop error.
     HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
     // The log block version has been upgraded in version six so compaction is 
required for downgrade.
-    runCompaction(table, context, config, upgradeDowngradeHelper);
-
-    syncCompactionRequestedFileToAuxiliaryFolder(table);
+    UpgradeDowngradeUtils.runCompaction(table, context, config, 
upgradeDowngradeHelper);
+    UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
 
     HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.reload(table.getMetaClient());
     Map<ConfigProperty, String> updatedTableProps = new HashMap<>();
@@ -78,45 +64,4 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
     return updatedTableProps;
   }
 
-  /**
-   * Utility method to run compaction for MOR table as part of downgrade step.
-   */
-  private void runCompaction(HoodieTable table, HoodieEngineContext context, 
HoodieWriteConfig config,
-                             SupportsUpgradeDowngrade upgradeDowngradeHelper) {
-    try {
-      if (table.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
-        // set required configs for scheduling compaction.
-        
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
-        HoodieWriteConfig compactionConfig = 
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
-        compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(), 
"true");
-        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
 "1");
-        
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
 CompactionTriggerStrategy.NUM_COMMITS.name());
-        
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(), 
UnBoundedCompactionStrategy.class.getName());
-        compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
-        try (BaseHoodieWriteClient writeClient = 
upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
-          Option<String> compactionInstantOpt = 
writeClient.scheduleCompaction(Option.empty());
-          if (compactionInstantOpt.isPresent()) {
-            writeClient.compact(compactionInstantOpt.get());
-          }
-        }
-      }
-    } catch (Exception e) {
-      throw new HoodieException(e);
-    }
-  }
-
-  /**
-   * See HUDI-6040.
-   */
-  private static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable 
table) {
-    HoodieTableMetaClient metaClient = table.getMetaClient();
-    HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient, 
false).filterPendingCompactionTimeline()
-        .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
-    compactionTimeline.getInstantsAsStream().forEach(instant -> {
-      String fileName = instant.getFileName();
-      FileIOUtils.copy(metaClient.getStorage(),
-          new StoragePath(metaClient.getMetaPath(), fileName),
-          new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
-    });
-  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
similarity index 57%
copy from 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
copy to 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index 68938e895b0..b6e4db6e8e6 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/SixToFiveDowngradeHandler.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -19,11 +19,9 @@
 package org.apache.hudi.table.upgrade;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
-import org.apache.hudi.common.config.ConfigProperty;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieTableType;
-import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
@@ -34,55 +32,21 @@ import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieCompactionConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieException;
-import org.apache.hudi.metadata.HoodieTableMetadataUtil;
+import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.storage.StoragePath;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
 import 
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.io.IOException;
 
-import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS;
-import static 
org.apache.hudi.common.table.HoodieTableConfig.TABLE_METADATA_PARTITIONS_INFLIGHT;
-
-/**
- * Downgrade handle to assist in downgrading hoodie table from version 6 to 5.
- * To ensure compatibility, we need recreate the compaction requested file to
- * .aux folder.
- * Since version 6 includes a new schema field for metadata table(MDT),
- * the MDT needs to be deleted during downgrade to avoid column drop error.
- * Also log block version was upgraded in version 6, therefore full compaction 
needs
- * to be completed during downgrade to avoid both read and future compaction 
failures.
- */
-public class SixToFiveDowngradeHandler implements DowngradeHandler {
-
-  @Override
-  public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config, 
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
-    final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
-
-    // Since version 6 includes a new schema field for metadata table(MDT), 
the MDT needs to be deleted during downgrade to avoid column drop error.
-    HoodieTableMetadataUtil.deleteMetadataTable(config.getBasePath(), context);
-    // The log block version has been upgraded in version six so compaction is 
required for downgrade.
-    runCompaction(table, context, config, upgradeDowngradeHelper);
-
-    syncCompactionRequestedFileToAuxiliaryFolder(table);
-
-    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.reload(table.getMetaClient());
-    Map<ConfigProperty, String> updatedTableProps = new HashMap<>();
-    HoodieTableConfig tableConfig = metaClient.getTableConfig();
-    Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS))
-        .ifPresent(v -> updatedTableProps.put(TABLE_METADATA_PARTITIONS, v));
-    
Option.ofNullable(tableConfig.getString(TABLE_METADATA_PARTITIONS_INFLIGHT))
-        .ifPresent(v -> 
updatedTableProps.put(TABLE_METADATA_PARTITIONS_INFLIGHT, v));
-    return updatedTableProps;
-  }
+public class UpgradeDowngradeUtils {
 
   /**
    * Utility method to run compaction for MOR table as part of downgrade step.
    */
-  private void runCompaction(HoodieTable table, HoodieEngineContext context, 
HoodieWriteConfig config,
-                             SupportsUpgradeDowngrade upgradeDowngradeHelper) {
+  public static void runCompaction(HoodieTable table, HoodieEngineContext 
context, HoodieWriteConfig config,
+                                   SupportsUpgradeDowngrade 
upgradeDowngradeHelper) {
     try {
       if (table.getMetaClient().getTableType() == 
HoodieTableType.MERGE_ON_READ) {
         // set required configs for scheduling compaction.
@@ -108,15 +72,21 @@ public class SixToFiveDowngradeHandler implements 
DowngradeHandler {
   /**
    * See HUDI-6040.
    */
-  private static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable 
table) {
+  public static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable 
table) {
     HoodieTableMetaClient metaClient = table.getMetaClient();
     HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient, 
false).filterPendingCompactionTimeline()
         .filter(instant -> instant.getState() == 
HoodieInstant.State.REQUESTED);
     compactionTimeline.getInstantsAsStream().forEach(instant -> {
       String fileName = instant.getFileName();
-      FileIOUtils.copy(metaClient.getStorage(),
-          new StoragePath(metaClient.getMetaPath(), fileName),
-          new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
+      try {
+        if (!metaClient.getStorage().exists(new 
StoragePath(metaClient.getMetaAuxiliaryPath(), fileName))) {
+          FileIOUtils.copy(metaClient.getStorage(),
+              new StoragePath(metaClient.getMetaPath(), fileName),
+              new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
+        }
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
     });
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
index 05d34241ec1..c8f1bd615c8 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieInstant.java
@@ -44,7 +44,7 @@ public class HoodieInstant implements Serializable, 
Comparable<HoodieInstant> {
 
   private static final String DELIMITER = ".";
 
-  private static final String UNDERSCORE = "_";
+  public static final String UNDERSCORE = "_";
 
   private static final String FILE_NAME_FORMAT_ERROR =
       "The provided file name %s does not conform to the required format";
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
index 794e7a322b3..432d2ab767e 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/common/HoodieSparkSqlTestBase.scala
@@ -42,6 +42,7 @@ import org.slf4j.LoggerFactory
 
 import java.io.File
 import java.util.TimeZone
+import java.util.regex.Pattern
 
 class HoodieSparkSqlTestBase extends FunSuite with BeforeAndAfterAll {
   org.apache.log4j.Logger.getRootLogger.setLevel(org.apache.log4j.Level.WARN)
@@ -249,6 +250,9 @@ class HoodieSparkSqlTestBase extends FunSuite with 
BeforeAndAfterAll {
 
 object HoodieSparkSqlTestBase {
 
+  // the naming format of 0.x version
+  final val NAME_FORMAT_0_X: Pattern = 
Pattern.compile("^(\\d+)(\\.\\w+)(\\.\\D+)?$")
+
   def getLastCommitMetadata(spark: SparkSession, tablePath: String) = {
     val metaClient = createMetaClient(spark, tablePath)
 
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
index bf7b509b970..fed1ea7c2c8 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala
@@ -22,9 +22,12 @@ import org.apache.hudi.common.table.{HoodieTableConfig, 
HoodieTableMetaClient, H
 import org.apache.hudi.common.util.{BinaryUtil, ConfigUtils, StringUtils}
 import org.apache.hudi.storage.StoragePath
 import org.apache.hudi.testutils.HoodieClientTestUtils.createMetaClient
+import org.apache.spark.sql.hudi.common.HoodieSparkSqlTestBase.NAME_FORMAT_0_X
 
 import java.io.IOException
 import java.time.Instant
+import scala.collection.JavaConverters._
+
 
 class TestUpgradeOrDowngradeProcedure extends HoodieSparkProcedureTestBase {
 
@@ -142,6 +145,56 @@ class TestUpgradeOrDowngradeProcedure extends 
HoodieSparkProcedureTestBase {
     }
   }
 
+  test("Test downgrade table from version eight to version seven") {
+    withTempDir { tmp =>
+      val tableName = generateTableName
+      val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+      // create table
+      spark.sql(
+        s"""
+           |create table $tableName (
+           |  id int,
+           |  name string,
+           |  price double,
+           |  ts long
+           |) using hudi
+           | location '$tablePath'
+           | options (
+           |  type = 'mor',
+           |  primaryKey = 'id',
+           |  preCombineField = 'ts'
+           | )
+       """.stripMargin)
+
+      spark.sql("set hoodie.compact.inline=true")
+      spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+      spark.sql("set hoodie.clean.commits.retained = 2")
+      spark.sql("set hoodie.keep.min.commits = 3")
+      spark.sql("set hoodie.keep.min.commits = 4")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+      spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+      var metaClient = createMetaClient(spark, tablePath)
+      // verify hoodie.table.version of the table is EIGHT
+      if 
(metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode()))
 {
+        // downgrade table from version eight to version seven
+        checkAnswer(s"""call downgrade_table(table => '$tableName', to_version 
=> 'SEVEN')""")(Seq(true))
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        assertResult(HoodieTableVersion.SEVEN.versionCode) {
+          metaClient.getTableConfig.getTableVersion.versionCode()
+        }
+        // Verify whether the naming format of instant files is consistent 
with 0.x
+        
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f => 
NAME_FORMAT_0_X.matcher(f.getFileName).find())
+        checkAnswer(s"select id, name, price, ts from $tableName")(
+          Seq(1, "a1", 10.0, 1000)
+        )
+      }
+    }
+  }
+
   @throws[IOException]
   private def assertTableVersionFromPropertyFile(metaClient: 
HoodieTableMetaClient, versionCode: Int): Unit = {
     val propertyFile = new StoragePath(metaClient.getMetaPath + "/" + 
HoodieTableConfig.HOODIE_PROPERTIES_FILE)

Reply via email to