codope commented on code in PR #11545:
URL: https://github.com/apache/hudi/pull/11545#discussion_r1671596452
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java:
##########
@@ -20,18 +20,53 @@
import org.apache.hudi.common.config.ConfigProperty;
import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import java.io.IOException;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
+
/**
* Version 7 is going to be placeholder version for bridge release 0.16.0.
* Version 8 is the placeholder version to track 1.x.
*/
public class EightToSevenDowngradeHandler implements DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ final HoodieTable table = upgradeDowngradeHelper.getTable(config, context);
+ UpgradeDowngradeUtils.runCompaction(table, context, config,
upgradeDowngradeHelper);
+ UpgradeDowngradeUtils.syncCompactionRequestedFileToAuxiliaryFolder(table);
+
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
+ List<HoodieInstant> instants =
metaClient.getActiveTimeline().getInstants();
+ if (!instants.isEmpty()) {
+ context.map(instants, instant -> {
+ if (!instant.getFileName().contains("_")) {
+ return false;
+ }
+ try {
+ // Rename the metadata file name from the
${instant_time}_${completion_time}.action[.state] format in version 1.x to the
${instant_time}.action[.state] format in version 0.x.
+ StoragePath fromPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName());
+ StoragePath toPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName().replaceAll("_\\d+", ""));
+ boolean success = metaClient.getStorage().rename(fromPath, toPath);
+ // TODO: We need to rename the action-related part of the metadata
file name here when we bring separate action name for clustering/compaction in
1.x as well.
Review Comment:
Is there a separate ticket tracking this TODO?
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/spark/sql/hudi/procedure/TestUpgradeOrDowngradeProcedure.scala:
##########
@@ -142,6 +143,56 @@ class TestUpgradeOrDowngradeProcedure extends
HoodieSparkProcedureTestBase {
}
}
+ test("Test downgrade table from version eight to version seven") {
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ // create table
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price double,
+ | ts long
+ |) using hudi
+ | location '$tablePath'
+ | options (
+ | type = 'mor',
+ | primaryKey = 'id',
+ | preCombineField = 'ts'
+ | )
+ """.stripMargin)
+
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=1")
+ spark.sql("set hoodie.clean.commits.retained = 2")
+ spark.sql("set hoodie.keep.min.commits = 3")
+ spark.sql("set hoodie.keep.min.commits = 4")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+ spark.sql(s"insert into $tableName values(1, 'a1', 10, 1000)")
+
+ var metaClient = createMetaClient(spark, tablePath)
+ // verify hoodie.table.version of the table is EIGHT
+ if
(metaClient.getTableConfig.getTableVersion.versionCode().equals(HoodieTableVersion.EIGHT.versionCode()))
{
+ // downgrade table from version eight to version seven
+ checkAnswer(s"""call downgrade_table(table => '$tableName', to_version
=> 'SEVEN')""")(Seq(true))
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+ assertResult(HoodieTableVersion.SEVEN.versionCode) {
+ metaClient.getTableConfig.getTableVersion.versionCode()
+ }
+ // Verify whether the naming format of instant files is consistent
with 0.x
+
metaClient.reloadActiveTimeline().getInstants.iterator().asScala.forall(f =>
!f.getFileName.contains("_"))
Review Comment:
Can we add a pattern in `HoodieInstant` and make this check more strict? For
example, in HoodieInstant, we have `NAME_FORMAT` pattern which checks for both
0.x and 1.x name pattern. We can add a more strict pattern like NAME_FORMAT_0_X
which should fail for 1.x instant file names.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/EightToSevenDowngradeHandler.java:
##########
@@ -32,6 +39,28 @@
public class EightToSevenDowngradeHandler implements DowngradeHandler {
@Override
public Map<ConfigProperty, String> downgrade(HoodieWriteConfig config,
HoodieEngineContext context, String instantTime, SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ HoodieTableMetaClient metaClient =
HoodieTableMetaClient.builder().setConf(context.getStorageConf().newInstance()).setBasePath(config.getBasePath()).build();
+ List<HoodieInstant> instants =
metaClient.getActiveTimeline().getInstants();
+ if (!instants.isEmpty()) {
+ context.map(instants, instant -> {
+ if (!instant.getFileName().contains("_")) {
+ return false;
+ }
+ try {
+ // Rename the metadata file name from the
${instant_time}_${completion_time}.action[.state] format in version 1.x to the
${instant_time}.action[.state] format in version 0.x.
+ StoragePath fromPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName());
+ StoragePath toPath = new StoragePath(metaClient.getMetaPath(),
instant.getFileName().replaceAll("_\\d+", ""));
+ boolean success = metaClient.getStorage().rename(fromPath, toPath);
+ // TODO: We need to rename the action-related part of the metadata
file name here when we bring separate action name for clustering/compaction in
1.x as well.
+ if (!success) {
+ throw new HoodieIOException("Error when rename the instant file: "
+ fromPath + " to: " + toPath);
+ }
+ return true;
+ } catch (IOException e) {
+ throw new HoodieException("Can not to complete the downgrade from
version eight to version seven.", e);
Review Comment:
Valid point! We need to call it out in docs and log a warning for the user.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieInstantTimeGenerator;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.compact.CompactionTriggerStrategy;
+import
org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
+
+import java.io.IOException;
+
+public class UpgradeDowngradeUtils {
+
+ /**
+ * Utility method to run compaction for MOR table as part of downgrade step.
+ */
+ public static void runCompaction(HoodieTable table, HoodieEngineContext
context, HoodieWriteConfig config,
+ SupportsUpgradeDowngrade
upgradeDowngradeHelper) {
+ try {
+ if (table.getMetaClient().getTableType() ==
HoodieTableType.MERGE_ON_READ) {
+ // set required configs for scheduling compaction.
+
HoodieInstantTimeGenerator.setCommitTimeZone(table.getMetaClient().getTableConfig().getTimelineTimezone());
+ HoodieWriteConfig compactionConfig =
HoodieWriteConfig.newBuilder().withProps(config.getProps()).build();
+ compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT.key(),
"true");
+
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_NUM_DELTA_COMMITS.key(),
"1");
+
compactionConfig.setValue(HoodieCompactionConfig.INLINE_COMPACT_TRIGGER_STRATEGY.key(),
CompactionTriggerStrategy.NUM_COMMITS.name());
+
compactionConfig.setValue(HoodieCompactionConfig.COMPACTION_STRATEGY.key(),
UnBoundedCompactionStrategy.class.getName());
+ compactionConfig.setValue(HoodieMetadataConfig.ENABLE.key(), "false");
+ try (BaseHoodieWriteClient writeClient =
upgradeDowngradeHelper.getWriteClient(compactionConfig, context)) {
+ Option<String> compactionInstantOpt =
writeClient.scheduleCompaction(Option.empty());
+ if (compactionInstantOpt.isPresent()) {
+ writeClient.compact(compactionInstantOpt.get());
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new HoodieException(e);
+ }
+ }
+
+ /**
+ * See HUDI-6040.
+ */
+ public static void syncCompactionRequestedFileToAuxiliaryFolder(HoodieTable
table) {
+ HoodieTableMetaClient metaClient = table.getMetaClient();
+ HoodieTimeline compactionTimeline = new HoodieActiveTimeline(metaClient,
false).filterPendingCompactionTimeline()
+ .filter(instant -> instant.getState() ==
HoodieInstant.State.REQUESTED);
+ compactionTimeline.getInstantsAsStream().forEach(instant -> {
+ String fileName = instant.getFileName();
+ try {
+ if (!metaClient.getStorage().exists(new
StoragePath(metaClient.getMetaAuxiliaryPath(), fileName))) {
+ FileIOUtils.copy(metaClient.getStorage(),
+ new StoragePath(metaClient.getMetaPath(), fileName),
+ new StoragePath(metaClient.getMetaAuxiliaryPath(), fileName));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException(e.getMessage(), e);
+ }
+ });
+ }
+}
Review Comment:
nit: new line at the end
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]