This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit ca9dea50b8ac4a6ca462acf90533573ea28cbf19 Author: Vamshi Krishna Kyatham <[email protected]> AuthorDate: Sun Oct 19 07:12:01 2025 -0700 fix: disable NBCC with default single writer for MDT for downgrade from version 9 (#14109) --- .../hudi/table/upgrade/UpgradeDowngradeUtils.java | 6 ++ .../TestUpgradeDowngradeConcurrencyControl.java | 95 ++++++++++++++++++++++ 2 files changed, 101 insertions(+) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java index 673c3a211803..c3cc4eef4ed5 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload; import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload; import org.apache.hudi.common.model.PartialUpdateAvroPayload; +import org.apache.hudi.common.model.WriteConcurrencyMode; import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload; import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload; import org.apache.hudi.common.table.HoodieTableConfig; @@ -207,6 +208,11 @@ public class UpgradeDowngradeUtils { properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(), NoopLockProvider.class.getName()); // if auto adjust it not disabled, chances that InProcessLockProvider will get overridden for single writer use-cases. properties.put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(), "false"); + // if downgrading from table version 9, disable non-blocking concurrency control for MDT + // as version 8 and below do not support streaming, NBCC for MDT + if (table.isMetadataTable() && tableVersion.equals(HoodieTableVersion.NINE)) { + properties.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.SINGLE_WRITER.name()); + } HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder() .withProps(properties) .withWriteTableVersion(tableVersion.versionCode()) diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java new file mode 100644 index 000000000000..82d44e8af462 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.upgrade; + +import org.apache.hudi.client.BaseHoodieWriteClient; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.HoodieTimelineTimeZone; +import org.apache.hudi.common.model.WriteConcurrencyMode; +import org.apache.hudi.common.table.HoodieTableConfig; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.HoodieTableVersion; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.atomic.AtomicReference; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +class TestUpgradeDowngradeConcurrencyControl { + + @Test + void testConcurrencyModeForMDTDowngradeFromVersion9() throws Exception { + HoodieTable table = mock(HoodieTable.class); + HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class); + HoodieTableConfig tableConfig = mock(HoodieTableConfig.class); + HoodieEngineContext context = mock(HoodieEngineContext.class); + + when(table.isMetadataTable()).thenReturn(true); + when(table.getMetaClient()).thenReturn(metaClient); + when(metaClient.getTableConfig()).thenReturn(tableConfig); + when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE); + when(tableConfig.getTimelineTimezone()).thenReturn(HoodieTimelineTimeZone.UTC); + + TypedProperties props = new TypedProperties(); + props.put(HoodieWriteConfig.BASE_PATH.key(), "/tmp/test-mdt-table"); + props.put("hoodie.table.name", "test_mdt_table"); + props.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(), WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name()); + HoodieWriteConfig config = mock(HoodieWriteConfig.class); + when(config.getProps()).thenReturn(props); + when(config.shouldRollbackUsingMarkers()).thenReturn(true); + + AtomicReference<HoodieWriteConfig> capturedConfig = new AtomicReference<>(); + + SupportsUpgradeDowngrade upgradeDowngradeHelper = new SupportsUpgradeDowngrade() { + @Override + public HoodieTable getTable(HoodieWriteConfig config, HoodieEngineContext context) { + return table; + } + + @Override + public String getPartitionColumns(HoodieWriteConfig config) { + return ""; + } + + @Override + public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config, HoodieEngineContext context) { + capturedConfig.set(config); + BaseHoodieWriteClient mockClient = mock(BaseHoodieWriteClient.class); + when(mockClient.rollbackFailedWrites(any())).thenReturn(true); + return mockClient; + } + }; + + UpgradeDowngradeUtils.rollbackFailedWritesAndCompact( + table, context, config, upgradeDowngradeHelper, false, HoodieTableVersion.NINE); + + HoodieWriteConfig rollbackConfig = capturedConfig.get(); + assertEquals(WriteConcurrencyMode.SINGLE_WRITER.name(), + rollbackConfig.getWriteConcurrencyMode().name(), + "WRITE_CONCURRENCY_MODE should be SINGLE_WRITER for MDT when downgrading from version 9"); + } +} \ No newline at end of file
