This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d08b1e6cd3b2 fix: disable NBCC with default single writer for MDT for
downgrade from version 9 (#14109)
d08b1e6cd3b2 is described below
commit d08b1e6cd3b22e0358658d9d677892c49bd848ae
Author: Vamshi Krishna Kyatham
<[email protected]>
AuthorDate: Sun Oct 19 07:12:01 2025 -0700
fix: disable NBCC with default single writer for MDT for downgrade from
version 9 (#14109)
---
.../hudi/table/upgrade/UpgradeDowngradeUtils.java | 6 ++
.../TestUpgradeDowngradeConcurrencyControl.java | 95 ++++++++++++++++++++++
2 files changed, 101 insertions(+)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
index 673c3a211803..c3cc4eef4ed5 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngradeUtils.java
@@ -32,6 +32,7 @@ import org.apache.hudi.common.model.HoodieTableType;
import org.apache.hudi.common.model.OverwriteNonDefaultsWithLatestAvroPayload;
import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
import org.apache.hudi.common.model.PartialUpdateAvroPayload;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
import org.apache.hudi.common.model.debezium.MySqlDebeziumAvroPayload;
import org.apache.hudi.common.model.debezium.PostgresDebeziumAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
@@ -207,6 +208,11 @@ public class UpgradeDowngradeUtils {
properties.put(HoodieLockConfig.LOCK_PROVIDER_CLASS_NAME.key(),
NoopLockProvider.class.getName());
// if auto adjust it not disabled, chances that InProcessLockProvider
will get overridden for single writer use-cases.
properties.put(HoodieWriteConfig.AUTO_ADJUST_LOCK_CONFIGS.key(),
"false");
+ // if downgrading from table version 9, disable non-blocking concurrency
control for MDT
+ // as version 8 and below do not support streaming, NBCC for MDT
+ if (table.isMetadataTable() &&
tableVersion.equals(HoodieTableVersion.NINE)) {
+ properties.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.SINGLE_WRITER.name());
+ }
HoodieWriteConfig rollbackWriteConfig = HoodieWriteConfig.newBuilder()
.withProps(properties)
.withWriteTableVersion(tableVersion.versionCode())
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java
new file mode 100644
index 000000000000..82d44e8af462
--- /dev/null
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngradeConcurrencyControl.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.client.BaseHoodieWriteClient;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieTimelineTimeZone;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class TestUpgradeDowngradeConcurrencyControl {
+
+ @Test
+ void testConcurrencyModeForMDTDowngradeFromVersion9() throws Exception {
+ HoodieTable table = mock(HoodieTable.class);
+ HoodieTableMetaClient metaClient = mock(HoodieTableMetaClient.class);
+ HoodieTableConfig tableConfig = mock(HoodieTableConfig.class);
+ HoodieEngineContext context = mock(HoodieEngineContext.class);
+
+ when(table.isMetadataTable()).thenReturn(true);
+ when(table.getMetaClient()).thenReturn(metaClient);
+ when(metaClient.getTableConfig()).thenReturn(tableConfig);
+ when(metaClient.getTableType()).thenReturn(HoodieTableType.COPY_ON_WRITE);
+
when(tableConfig.getTimelineTimezone()).thenReturn(HoodieTimelineTimeZone.UTC);
+
+ TypedProperties props = new TypedProperties();
+ props.put(HoodieWriteConfig.BASE_PATH.key(), "/tmp/test-mdt-table");
+ props.put("hoodie.table.name", "test_mdt_table");
+ props.put(HoodieWriteConfig.WRITE_CONCURRENCY_MODE.key(),
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL.name());
+ HoodieWriteConfig config = mock(HoodieWriteConfig.class);
+ when(config.getProps()).thenReturn(props);
+ when(config.shouldRollbackUsingMarkers()).thenReturn(true);
+
+ AtomicReference<HoodieWriteConfig> capturedConfig = new
AtomicReference<>();
+
+ SupportsUpgradeDowngrade upgradeDowngradeHelper = new
SupportsUpgradeDowngrade() {
+ @Override
+ public HoodieTable getTable(HoodieWriteConfig config,
HoodieEngineContext context) {
+ return table;
+ }
+
+ @Override
+ public String getPartitionColumns(HoodieWriteConfig config) {
+ return "";
+ }
+
+ @Override
+ public BaseHoodieWriteClient getWriteClient(HoodieWriteConfig config,
HoodieEngineContext context) {
+ capturedConfig.set(config);
+ BaseHoodieWriteClient mockClient = mock(BaseHoodieWriteClient.class);
+ when(mockClient.rollbackFailedWrites(any())).thenReturn(true);
+ return mockClient;
+ }
+ };
+
+ UpgradeDowngradeUtils.rollbackFailedWritesAndCompact(
+ table, context, config, upgradeDowngradeHelper, false,
HoodieTableVersion.NINE);
+
+ HoodieWriteConfig rollbackConfig = capturedConfig.get();
+ assertEquals(WriteConcurrencyMode.SINGLE_WRITER.name(),
+ rollbackConfig.getWriteConcurrencyMode().name(),
+ "WRITE_CONCURRENCY_MODE should be SINGLE_WRITER for MDT when
downgrading from version 9");
+ }
+}
\ No newline at end of file