This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new fca45562180 [HUDI-8480] Adding new write config for streaming writes 
to metadata table and Enabling Non Blocking Concurrency Control with Metadata 
(#13292)
fca45562180 is described below

commit fca45562180c232dfefbf1fdf5cb3a639987f3c3
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Sat May 31 07:31:40 2025 -0700

    [HUDI-8480] Adding new write config for streaming writes to metadata table 
and Enabling Non Blocking Concurrency Control with Metadata (#13292)
    
    - Adding write config to support streaming writes to metadata table.
    Config is named "hoodie.metadata.streaming.write.enabled".
    - Enabling Non Blocking Concurrency Control with Metadata when streaming 
writes are enabled
---
 .../apache/hudi/client/utils/TransactionUtils.java |  2 +-
 .../org/apache/hudi/config/HoodieWriteConfig.java  | 44 ++++++++++++++---
 .../metadata/HoodieBackedTableMetadataWriter.java  |  2 +-
 .../hudi/metadata/HoodieMetadataWriteUtils.java    | 26 ++++++++--
 .../hudi/table/upgrade/UpgradeDowngrade.java       |  2 +-
 .../TestConflictResolutionStrategyUtil.java        | 23 ++++++---
 .../hudi/client/utils/TestTransactionUtils.java    | 47 ++++++++++++++++++
 .../apache/hudi/config/TestHoodieWriteConfig.java  | 27 +++++++++++
 .../metadata/TestHoodieMetadataWriteUtils.java     | 56 ++++++++++++++++++++--
 .../hudi/utils/HoodieWriterClientTestHarness.java  | 10 +++-
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  2 +-
 .../hudi/client/TestJavaHoodieBackedMetadata.java  |  1 +
 .../TestHoodieJavaClientOnCopyOnWriteStorage.java  |  5 ++
 .../hudi/testutils/TestHoodieMetadataBase.java     |  8 +++-
 .../hudi/client/SparkRDDMetadataWriteClient.java   |  6 +++
 .../SparkHoodieBackedTableMetadataWriter.java      |  3 +-
 .../client/TestSparkRDDMetadataWriteClient.java    |  4 +-
 .../client/functional/TestHoodieMetadataBase.java  |  4 +-
 .../hudi/common/config/HoodieMetadataConfig.java   | 31 ++++++++++++
 .../TestColStatsRecordWithMetadataRecord.java      |  3 +-
 20 files changed, 275 insertions(+), 31 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
index a83f1ffd6cf..8ebfb45ca08 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/TransactionUtils.java
@@ -72,7 +72,7 @@ public class TransactionUtils {
       boolean timelineRefreshedWithinTransaction,
       Set<String> pendingInstants) throws HoodieWriteConflictException {
     WriteOperationType operationType = 
thisCommitMetadata.map(HoodieCommitMetadata::getOperationType).orElse(null);
-    if (config.needResolveWriteConflict(operationType)) {
+    if (config.needResolveWriteConflict(operationType, 
table.isMetadataTable(), config, table.getMetaClient().getTableConfig())) {
       // deal with pendingInstants
       if (!timelineRefreshedWithinTransaction) {
         table.getMetaClient().reloadActiveTimeline();
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index ae34ee3337e..289863cc3a4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2746,16 +2746,24 @@ public class HoodieWriteConfig extends HoodieConfig {
     return props.getInteger(WRITES_FILEID_ENCODING, 
HoodieMetadataPayload.RECORD_INDEX_FIELD_FILEID_ENCODING_UUID);
   }
 
-  public boolean needResolveWriteConflict(WriteOperationType operationType) {
+  public boolean needResolveWriteConflict(WriteOperationType operationType, 
boolean isMetadataTable, HoodieWriteConfig config,
+                                          HoodieTableConfig tableConfig) {
     WriteConcurrencyMode mode = getWriteConcurrencyMode();
     switch (mode) {
       case SINGLE_WRITER:
         return false;
       case OPTIMISTIC_CONCURRENCY_CONTROL:
         return true;
-      case NON_BLOCKING_CONCURRENCY_CONTROL:
-        // NB-CC don't need to resolve write conflict except bulk insert 
operation
-        return WriteOperationType.BULK_INSERT == operationType;
+      case NON_BLOCKING_CONCURRENCY_CONTROL: {
+        if (isMetadataTable) {
+          // datatable NB-CC is still evolving and might go through evolution 
compared to its current state.
+          // But in case of metadata table, when streaming writes are enabled, 
no two writes can conflict and hence.
+          return false;
+        } else {
+          // NB-CC don't need to resolve write conflict except bulk insert 
operation
+          return WriteOperationType.BULK_INSERT == operationType;
+        }
+      }
       default:
         throw new IllegalArgumentException("Invalid WriteConcurrencyMode " + 
mode);
     }
@@ -2800,6 +2808,29 @@ public class HoodieWriteConfig extends HoodieConfig {
     return metadataConfig.getSecondaryIndexParallelism();
   }
 
+  /**
+   * Whether to enable streaming writes to metadata table or not.
+   * We have support for streaming writes only in SPARK engine (due to spark 
task retries intricacies) and for table version >= 8 due to the
+   * pre-requisite of NBCC.
+   *
+   * <p>To support streaming writes, we need NBCC support for metadata table, 
since there could have an ingestion and a table service from data table
+   * concurrently trying to write to metadata table.
+   *
+   * <p>In Spark, when streaming writes are enabled, incremental operations 
from data table like insert, upsert, delete and table services
+   * (compaction and clustering) will take the streaming writes flow, while 
all other operations (like delete_partition, insert_overwrite, etc.) go through
+   * legacy metadata write paths (since these might involve reading entire 
partition and not purely rely on incremental data written).
+   *
+   * @param tableVersion {@link HoodieTableVersion} of interest.
+   * @return true if streaming writes are enabled. false otherwise.
+   */
+  public boolean isMetadataStreamingWritesEnabled(HoodieTableVersion 
tableVersion) {
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      return metadataConfig.isStreamingWriteEnabled();
+    } else {
+      return false;
+    }
+  }
+
   public static class Builder {
 
     protected final HoodieWriteConfig writeConfig = new HoodieWriteConfig();
@@ -3478,9 +3509,10 @@ public class HoodieWriteConfig extends HoodieConfig {
                 writeConcurrencyMode.name()));
       }
       if (writeConcurrencyMode == 
WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL) {
+        boolean isMetadataTable = 
HoodieTableMetadata.isMetadataTable(writeConfig.getBasePath());
         checkArgument(
-            writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ) 
&& writeConfig.isSimpleBucketIndex(),
-            "Non-blocking concurrency control requires the MOR table with 
simple bucket index");
+            writeConfig.getTableType().equals(HoodieTableType.MERGE_ON_READ) 
&& (isMetadataTable || writeConfig.isSimpleBucketIndex()),
+            "Non-blocking concurrency control requires the MOR table with 
simple bucket index or it has to be Metadata table");
       }
 
       HoodieCleaningPolicy cleaningPolicy = 
HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 104e0a51946..f3cae39ce82 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -191,7 +191,7 @@ public abstract class HoodieBackedTableMetadataWriter<I, O> 
implements HoodieTab
         
.setTimeGeneratorConfig(dataWriteConfig.getTimeGeneratorConfig()).build();
     this.enabledPartitionTypes = 
getEnabledPartitions(dataWriteConfig.getMetadataConfig(), dataMetaClient);
     if (writeConfig.isMetadataTableEnabled()) {
-      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy);
+      this.metadataWriteConfig = createMetadataWriteConfig(writeConfig, 
failedWritesCleaningPolicy, dataMetaClient.getTableConfig().getTableVersion());
       try {
         initRegistry();
         initialized = initializeIfNeeded(dataMetaClient, 
inflightInstantTimestamp);
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
index d0fec373275..b2e114dfd1b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataWriteUtils.java
@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.avro.model.HoodieMetadataRecord;
 import org.apache.hudi.client.FailOnFirstErrorWriteStatus;
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.RecordMergeMode;
@@ -28,15 +29,18 @@ import org.apache.hudi.common.model.HoodieAvroRecordMerger;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecordMerger;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteConcurrencyMode;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.common.util.VisibleForTesting;
 import org.apache.hudi.config.HoodieArchivalConfig;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieLockConfig;
 import org.apache.hudi.config.HoodiePayloadConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsConfig;
@@ -84,8 +88,19 @@ public class HoodieMetadataWriteUtils {
    */
   @VisibleForTesting
   public static HoodieWriteConfig createMetadataWriteConfig(
-      HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy) {
+      HoodieWriteConfig writeConfig, HoodieFailedWritesCleaningPolicy 
failedWritesCleaningPolicy,
+      HoodieTableVersion datatableVersion) {
     String tableName = writeConfig.getTableName() + METADATA_TABLE_NAME_SUFFIX;
+    boolean isStreamingWritesToMetadataEnabled = 
writeConfig.isMetadataStreamingWritesEnabled(datatableVersion);
+    WriteConcurrencyMode concurrencyMode = isStreamingWritesToMetadataEnabled
+        ? WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL : 
WriteConcurrencyMode.SINGLE_WRITER;
+    HoodieLockConfig lockConfig = isStreamingWritesToMetadataEnabled
+        ? 
HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class).build()
 : HoodieLockConfig.newBuilder().build();
+    // HUDI-9407 tracks adding support for separate lock configuration for 
MDT. Until then, all writes to MDT will happen within data table lock.
+
+    if (isStreamingWritesToMetadataEnabled) {
+      failedWritesCleaningPolicy = HoodieFailedWritesCleaningPolicy.LAZY;
+    }
 
     final long maxLogFileSizeBytes = 
writeConfig.getMetadataConfig().getMaxLogFileSize();
     // Borrow the cleaner policy from the main table and adjust the cleaner 
policy based on the main table's cleaner policy
@@ -119,8 +134,8 @@ public class HoodieMetadataWriteUtils {
             
.withMaxConsistencyCheckIntervalMs(writeConfig.getConsistencyGuardConfig().getMaxConsistencyCheckIntervalMs())
             
.withMaxConsistencyChecks(writeConfig.getConsistencyGuardConfig().getMaxConsistencyChecks())
             .build())
-        .withWriteConcurrencyMode(WriteConcurrencyMode.SINGLE_WRITER)
-        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
+        .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false)
+            
.withFileListingParallelism(writeConfig.getFileListingParallelism()).build())
         .withAvroSchemaValidate(false)
         .withEmbeddedTimelineServerEnabled(false)
         .withMarkersType(MarkerType.DIRECT.name())
@@ -166,10 +181,13 @@ public class HoodieMetadataWriteUtils {
         .withPayloadConfig(HoodiePayloadConfig.newBuilder()
             
.withPayloadClass(HoodieMetadataPayload.class.getCanonicalName()).build())
         
.withRecordMergeImplClasses(HoodieAvroRecordMerger.class.getCanonicalName())
-        .withWriteRecordPositionsEnabled(false);
+        .withWriteRecordPositionsEnabled(false)
+        .withWriteConcurrencyMode(concurrencyMode)
+        .withLockConfig(lockConfig);
 
     // RecordKey properties are needed for the metadata table records
     final Properties properties = new Properties();
+    properties.put(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
     properties.put(HoodieTableConfig.RECORDKEY_FIELDS.key(), 
RECORD_KEY_FIELD_NAME);
     properties.put("hoodie.datasource.write.recordkey.field", 
RECORD_KEY_FIELD_NAME);
     if (nonEmpty(writeConfig.getMetricReporterMetricsNamePrefix())) {
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
index 27f0320da9c..da391abe78f 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
@@ -139,7 +139,7 @@ public class UpgradeDowngrade {
           HoodieTableMetaClient mdtMetaClient = HoodieTableMetaClient.builder()
               
.setConf(metaClient.getStorageConf().newInstance()).setBasePath(metadataTablePath).build();
           HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(
-              config, HoodieFailedWritesCleaningPolicy.EAGER);
+              config, HoodieFailedWritesCleaningPolicy.EAGER, 
metaClient.getTableConfig().getTableVersion());
           new UpgradeDowngrade(mdtMetaClient, mdtWriteConfig, context, 
upgradeDowngradeHelper)
               .run(toVersion, instantTime);
         }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
index 32de4ab768a..ea7ddac7aa4 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/transaction/TestConflictResolutionStrategyUtil.java
@@ -26,6 +26,7 @@ import 
org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.avro.model.HoodieSliceInfo;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieReplaceCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
@@ -53,9 +54,13 @@ public class TestConflictResolutionStrategyUtil {
     writeStat.setFileId("file-1");
     
commitMetadata.addWriteStat(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 writeStat);
     commitMetadata.setOperationType(WriteOperationType.INSERT);
-    HoodieTestTable.of(metaClient)
-        .addCommit(instantTime, Option.of(commitMetadata))
-        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    HoodieTestTable testTable = HoodieTestTable.of(metaClient);
+    if (metaClient.getTableConfig().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+      testTable = testTable.addCommit(instantTime, Option.of(commitMetadata));
+    } else {
+      testTable = testTable.addDeltaCommit(instantTime, Option.empty(), 
commitMetadata);
+    }
+    
testTable.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH,
 fileId1, fileId2);
   }
 
   public static HoodieCommitMetadata createCommitMetadata(String instantTime, 
String writeFileName) {
@@ -75,9 +80,15 @@ public class TestConflictResolutionStrategyUtil {
   public static void createInflightCommit(String instantTime, 
HoodieTableMetaClient metaClient) throws Exception {
     String fileId1 = "file-" + instantTime + "-1";
     String fileId2 = "file-" + instantTime + "-2";
-    HoodieTestTable.of(metaClient)
-        .addInflightCommit(instantTime)
-        
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    if (metaClient.getTableConfig().getTableType() == 
HoodieTableType.COPY_ON_WRITE) {
+      HoodieTestTable.of(metaClient)
+          .addInflightCommit(instantTime)
+          
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    } else {
+      HoodieTestTable.of(metaClient)
+          .addInflightDeltaCommit(instantTime)
+          
.withBaseFilesInPartition(HoodieTestDataGenerator.DEFAULT_FIRST_PARTITION_PATH, 
fileId1, fileId2);
+    }
   }
 
   public static void createCompactionRequested(String instantTime, 
HoodieTableMetaClient metaClient) throws Exception {
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
index 0dad70501a5..c243a49d599 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestTransactionUtils.java
@@ -18,28 +18,35 @@
 
 package org.apache.hudi.client.utils;
 
+import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.HoodieCommonTestHarness;
 import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.exception.HoodieWriteConflictException;
 import org.apache.hudi.table.HoodieTable;
 
 import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.Properties;
 
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommit;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createCommitMetadata;
 import static 
org.apache.hudi.client.transaction.TestConflictResolutionStrategyUtil.createInflightCommit;
+import static 
org.apache.hudi.common.model.WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL;
 import static 
org.apache.hudi.common.model.WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
 import static org.junit.jupiter.api.Assertions.assertThrows;
@@ -85,4 +92,44 @@ class TestTransactionUtils extends HoodieCommonTestHarness {
             lastSuccessfulInstant, timelineRefreshedWithinTransaction, 
Collections.singleton(newInstantTime)));
     verify(spyMetaClient, times(timelineRefreshedWithinTransaction ? 0 : 
1)).reloadActiveTimeline();
   }
+
+  @Test
+  void resolveWriteConflictIfAnyNoExceptionForMetadataTable() throws Exception 
{
+    // instantiate MOR table for metadata table.
+    metaClient = HoodieTestUtils.init(basePath + "/.hoodie/metadata/", 
HoodieTableType.MERGE_ON_READ);
+
+    createCommit(HoodieTestTable.makeNewCommitTime(), metaClient);
+    HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
+    // consider commits before this are all successful
+    Option<HoodieInstant> lastSuccessfulInstant = 
timeline.getCommitsTimeline().filterCompletedInstants().lastInstant();
+    // writer 1 starts
+    String currentWriterInstant = HoodieTestTable.makeNewCommitTime();
+    createInflightCommit(currentWriterInstant, metaClient);
+    // writer 2 starts and finishes
+    String newInstantTime = HoodieTestTable.makeNewCommitTime();
+    createCommit(newInstantTime, metaClient);
+
+    Option<HoodieInstant> currentInstant = 
Option.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.INFLIGHT, 
HoodieTimeline.DELTA_COMMIT_ACTION, currentWriterInstant));
+    HoodieCommitMetadata currentMetadata = 
createCommitMetadata(currentWriterInstant, "file-1");
+
+    // mimic all props for metadata table.
+    Properties props = new Properties();
+    props.setProperty(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"true");
+    props.setProperty(HoodieTableConfig.TYPE.key(), 
HoodieTableType.MERGE_ON_READ.name());
+
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath(metaClient.getBasePath().toString())
+        .withWriteConcurrencyMode(NON_BLOCKING_CONCURRENCY_CONTROL)
+        .withProperties(props)
+        .build();
+    metaClient.reloadActiveTimeline();
+    HoodieTable table = mock(HoodieTable.class);
+    when(table.isMetadataTable()).thenReturn(true);
+    HoodieTableMetaClient spyMetaClient = spy(metaClient);
+    when(table.getMetaClient()).thenReturn(spyMetaClient);
+    Option<HoodieCommitMetadata> actualResult = 
TransactionUtils.resolveWriteConflictIfAny(table, currentInstant, 
Option.of(currentMetadata), writeConfig,
+            lastSuccessfulInstant, false, 
Collections.singleton(newInstantTime));
+    // since we bypass entire conflict resolution
+    verify(spyMetaClient, times(0)).reloadActiveTimeline();
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
index 90709ce8c7d..7d2563c5f2a 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodieWriteConfig.java
@@ -57,6 +57,7 @@ import java.util.Set;
 import java.util.function.Function;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -725,6 +726,32 @@ public class TestHoodieWriteConfig {
     assertEquals(bloomFilterType, config.getBloomFilterType());
   }
 
+  @Test
+  public void testStreamingWritesToMetadataConfig() {
+    Properties props = new Properties();
+    HoodieWriteConfig config = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withProperties(props)
+        .withEngineType(EngineType.SPARK).build();
+
+    
assertTrue(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+    
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withProperties(props)
+        .withEngineType(EngineType.FLINK).build();
+    
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+    
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+
+    config = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp")
+        .withProperties(props)
+        .withEngineType(EngineType.JAVA).build();
+    
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.SIX));
+    
assertFalse(config.isMetadataStreamingWritesEnabled(HoodieTableVersion.EIGHT));
+  }
+
   private HoodieWriteConfig createWriteConfig(Map<String, String> configs) {
     final Properties properties = new Properties();
     configs.forEach(properties::setProperty);
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
index 529d2ddfc7f..ee1fd587e04 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/metadata/TestHoodieMetadataWriteUtils.java
@@ -18,29 +18,36 @@
 
 package org.apache.hudi.metadata;
 
+import org.apache.hudi.client.transaction.lock.InProcessLockProvider;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.model.HoodieCleaningPolicy;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
+import org.apache.hudi.common.model.WriteConcurrencyMode;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.config.HoodieCleanConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
 
 import org.junit.jupiter.api.Test;
 
+import java.util.Properties;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
 
 public class TestHoodieMetadataWriteUtils {
 
   @Test
   public void testCreateMetadataWriteConfigForCleaner() {
     HoodieWriteConfig writeConfig1 = HoodieWriteConfig.newBuilder()
-        .withPath("/tmp")
+        .withPath("/tmp/")
         .withCleanConfig(HoodieCleanConfig.newBuilder()
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .retainCommits(5).build())
         .build();
 
-    HoodieWriteConfig metadataWriteConfig1 = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieWriteConfig metadataWriteConfig1 = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig1, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.SIX);
     assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, 
metadataWriteConfig1.getFailedWritesCleanPolicy());
     assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, 
metadataWriteConfig1.getCleanerPolicy());
     // default value already greater than data cleaner commits retained * 1.2
@@ -55,10 +62,53 @@ public class TestHoodieMetadataWriteUtils {
             .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
             .retainCommits(20).build())
         .build();
-    HoodieWriteConfig metadataWriteConfig2 = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieWriteConfig metadataWriteConfig2 = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig2, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.SIX);
     assertEquals(HoodieFailedWritesCleaningPolicy.EAGER, 
metadataWriteConfig2.getFailedWritesCleanPolicy());
     assertEquals(HoodieCleaningPolicy.KEEP_LATEST_COMMITS, 
metadataWriteConfig2.getCleanerPolicy());
     // data cleaner commits retained * 1.2 is greater than default
     assertEquals(24, metadataWriteConfig2.getCleanerCommitsRetained());
   }
+
+  @Test
+  public void testCreateMetadataWriteConfigForNBCC() {
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp/base_path/.hoodie/metadata/")
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(5).build())
+        .build();
+
+    HoodieWriteConfig metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.EIGHT);
+    validateMetadataWriteConfig(metadataWriteConfig, 
HoodieFailedWritesCleaningPolicy.LAZY,
+        WriteConcurrencyMode.NON_BLOCKING_CONCURRENCY_CONTROL, 
InProcessLockProvider.class.getCanonicalName());
+
+    // disable streaming writes to metadata table.
+    Properties properties = new Properties();
+    properties.put(HoodieMetadataConfig.STREAMING_WRITE_ENABLED.key(), 
"false");
+    writeConfig = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp/base_path/.hoodie/metadata/")
+        .withCleanConfig(HoodieCleanConfig.newBuilder()
+            .withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+            .retainCommits(5).build())
+        .withProperties(properties)
+        .build();
+
+    metadataWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.EIGHT);
+    validateMetadataWriteConfig(metadataWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        WriteConcurrencyMode.SINGLE_WRITER, null);
+  }
+
+  private void validateMetadataWriteConfig(HoodieWriteConfig 
metadataWriteConfig, HoodieFailedWritesCleaningPolicy expectedPolicy,
+                                           WriteConcurrencyMode 
expectedWriteConcurrencyMode, String expectedLockProviderClass) {
+    assertEquals(expectedPolicy, 
metadataWriteConfig.getFailedWritesCleanPolicy());
+    assertEquals(expectedWriteConcurrencyMode, 
metadataWriteConfig.getWriteConcurrencyMode());
+    if (expectedLockProviderClass != null) {
+      assertEquals(expectedLockProviderClass, 
metadataWriteConfig.getLockProviderClass());
+    } else {
+      assertNull(metadataWriteConfig.getLockProviderClass());
+    }
+  }
 }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
index f2513ebf2b7..9a413cedc44 100644
--- 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/utils/HoodieWriterClientTestHarness.java
@@ -29,6 +29,7 @@ import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.config.LockConfiguration;
 import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieListData;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.ConsistencyGuardConfig;
 import org.apache.hudi.common.fs.FSUtils;
@@ -207,6 +208,10 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
     return getConfigBuilder(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
   }
 
+  protected EngineType getEngineType() {
+    return EngineType.SPARK;
+  }
+
   /**
    * Get Config builder with default configs set.
    *
@@ -256,6 +261,7 @@ public abstract class HoodieWriterClientTestHarness extends 
HoodieCommonTestHarn
     if (StringUtils.nonEmpty(schemaStr)) {
       builder.withSchema(schemaStr);
     }
+    builder.withEngineType(getEngineType());
     return builder;
   }
 
@@ -1062,7 +1068,7 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
    * @throws Exception in case of failure
    */
   protected void testAutoCommit(Function3<Object, BaseHoodieWriteClient, 
Object, String> writeFn,
-                              boolean isPrepped, boolean populateMetaFields, 
InstantGenerator instantGenerator) throws Exception {
+                                boolean isPrepped, boolean populateMetaFields, 
InstantGenerator instantGenerator) throws Exception {
     HoodieWriteConfig.Builder cfgBuilder = getConfigBuilder();
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
     try (BaseHoodieWriteClient client = 
getHoodieWriteClient(cfgBuilder.build())) {
@@ -1191,7 +1197,7 @@ public abstract class HoodieWriterClientTestHarness 
extends HoodieCommonTestHarn
 
     HoodieWriteConfig.Builder cfgBuilder = 
getConfigBuilder(HoodieFailedWritesCleaningPolicy.LAZY).withRollbackUsingMarkers(true)
         
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withMetadataIndexColumnStats(true).withColumnStatsIndexForColumns("driver,rider")
-            .withMetadataIndexColumnStatsFileGroupCount(1).build())
+            
.withMetadataIndexColumnStatsFileGroupCount(1).withEngineType(getEngineType()).build())
         .withWriteTableVersion(6);
 
     addConfigsForPopulateMetaFields(cfgBuilder, populateMetaFields);
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 1a88cd6ec48..3d9958d0f4d 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -312,7 +312,7 @@ public class HoodieFlinkWriteClient<T>
    * should be called before the Driver starts a new transaction with a 
reloaded metaclient.
    */
   public void preTxn(WriteOperationType operationType, HoodieTableMetaClient 
metaClient) {
-    if (txnManager.isLockRequired() && 
config.needResolveWriteConflict(operationType)) {
+    if (txnManager.isLockRequired() && 
config.needResolveWriteConflict(operationType, metaClient.isMetadataTable(), 
config, metaClient.getTableConfig())) {
       this.lastCompletedTxnAndMetadata = 
TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient);
       this.pendingInflightAndRequestedInstants = 
TransactionUtils.getInflightAndRequestedInstants(metaClient);
     }
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
index 68258674aec..27f319c9739 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/TestJavaHoodieBackedMetadata.java
@@ -507,6 +507,7 @@ public class TestJavaHoodieBackedMetadata extends 
TestHoodieMetadataBase {
             .withMetadataIndexColumnStats(true)
             .withMetadataIndexBloomFilterFileGroups(2)
             .withMaxNumDeltaCommitsBeforeCompaction(12) // cannot restore to 
before the oldest compaction on MDT as there are no base files before that time
+            .withEngineType(EngineType.JAVA)
             .build())
         .build();
     // module com.fasterxml.jackson.datatype:jackson-datatype-jsr310 is needed 
for proper column stats processing for Jackson >= 2.11
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
index ac71581c42d..a8d4e5990f1 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/client/functional/TestHoodieJavaClientOnCopyOnWriteStorage.java
@@ -24,6 +24,7 @@ import org.apache.hudi.client.WriteStatus;
 import 
org.apache.hudi.client.clustering.plan.strategy.JavaSizeBasedClusteringPlanStrategy;
 import 
org.apache.hudi.client.clustering.run.strategy.JavaSortAndSizeExecutionStrategy;
 import org.apache.hudi.client.common.JavaTaskContextSupplier;
+import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
@@ -77,6 +78,10 @@ public class TestHoodieJavaClientOnCopyOnWriteStorage 
extends HoodieJavaClientTe
     );
   }
 
+  protected EngineType getEngineType() {
+    return EngineType.JAVA;
+  }
+
   @BeforeEach
   public void setUpTestTable() {
     testTable = HoodieMetadataTestTable.of(metaClient);
diff --git 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
index 8c6b55547f2..b555b769646 100644
--- 
a/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-java-client/src/test/java/org/apache/hudi/testutils/TestHoodieMetadataBase.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -309,15 +310,18 @@ public class TestHoodieMetadataBase extends 
HoodieJavaClientTestHarness {
             .enableMetrics(enableMetrics)
             .ignoreSpuriousDeletes(validateMetadataPayloadConsistency)
             .withMetadataIndexColumnStats(false) // HUDI-8774
+            .withEngineType(EngineType.JAVA)
             .build())
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
             
.withExecutorMetrics(enableMetrics).withReporterType(MetricsReporterType.INMEMORY.name()).build())
         .withRollbackUsingMarkers(useRollbackUsingMarkers)
-        .withProperties(properties);
+        .withProperties(properties)
+        .withEngineType(EngineType.JAVA);
   }
 
   protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig 
writeConfig) {
-    return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.LAZY);
+    return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.LAZY,
+        HoodieTableVersion.EIGHT);
   }
 
   protected HoodieTableMetaClient createMetaClientForMetadataTable() {
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
index a94f68759fc..f3f897d3915 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDMetadataWriteClient.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieFileGroupId;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.timeline.TimelineUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
@@ -57,6 +58,11 @@ public class SparkRDDMetadataWriteClient<T> extends 
SparkRDDWriteClient<T> {
     super(context, writeConfig, timelineService);
   }
 
+  @Override
+  public String createNewInstantTime() {
+    return TimelineUtils.generateInstantTime(false, timeGenerator);
+  }
+
   /**
    * Upserts the given prepared records into the Hoodie table, at the supplied 
instantTime.
    * <p>
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
index 3dcb60db62f..7c555c958c6 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/metadata/SparkHoodieBackedTableMetadataWriter.java
@@ -20,6 +20,7 @@ package org.apache.hudi.metadata;
 
 import org.apache.hudi.client.BaseHoodieWriteClient;
 import org.apache.hudi.client.HoodieWriteResult;
+import org.apache.hudi.client.SparkRDDMetadataWriteClient;
 import org.apache.hudi.client.SparkRDDWriteClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.utils.SparkMetadataWriterUtils;
@@ -247,7 +248,7 @@ public class SparkHoodieBackedTableMetadataWriter extends 
HoodieBackedTableMetad
 
   @Override
   public BaseHoodieWriteClient<?, JavaRDD<HoodieRecord>, ?, 
JavaRDD<WriteStatus>> initializeWriteClient() {
-    return new SparkRDDWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
+    return new SparkRDDMetadataWriteClient(engineContext, metadataWriteConfig, 
Option.empty());
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
index 00cef9163eb..ddda4a422d5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDMetadataWriteClient.java
@@ -33,6 +33,7 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -99,7 +100,8 @@ public class TestSparkRDDMetadataWriteClient extends 
HoodieClientTestBase {
     initDataTableWithACommit(hoodieWriteConfig);
 
     // fetch metadata file slice info
-    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(hoodieWriteConfig, 
HoodieFailedWritesCleaningPolicy.EAGER,
+        HoodieTableVersion.EIGHT);
     Map<String, List<String>> mdtPartitionsFileIdMapping = new HashMap<>();
     List<HoodieFileGroupId> nonFilesPartitionFileGroupIdList = new 
ArrayList<>();
     List<HoodieFileGroupId> filesPartitionFileGroupIdList = new ArrayList<>();
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 12456dfd223..bb674484f98 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -28,6 +28,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieMetadataTestTable;
 import org.apache.hudi.common.testutils.HoodieTestTable;
@@ -354,6 +355,7 @@ public class TestHoodieMetadataBase extends 
HoodieSparkClientTestHarness {
   }
 
   protected HoodieWriteConfig getMetadataWriteConfig(HoodieWriteConfig 
writeConfig) {
-    return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig, 
HoodieFailedWritesCleaningPolicy.LAZY);
+    return HoodieMetadataWriteUtils.createMetadataWriteConfig(writeConfig,
+        HoodieFailedWritesCleaningPolicy.LAZY, HoodieTableVersion.EIGHT);
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index f1a91e8227b..5de0600ccd5 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -71,6 +71,15 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
       .sinceVersion("0.7.0")
       .withDocumentation("Enable the internal metadata table which serves 
table metadata like level file listings");
 
+  public static final ConfigProperty<Boolean> STREAMING_WRITE_ENABLED = 
ConfigProperty
+      .key(METADATA_PREFIX + ".streaming.write.enabled")
+      .defaultValue(false)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Whether to enable streaming writes to metadata table 
or not. With streaming writes, we execute writes to both data table and 
metadata table "
+          + "in streaming manner rather than two disjoint writes. By default "
+          + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
+
   public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;
 
   // Enable metrics for internal Metadata Table
@@ -466,6 +475,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getBoolean(ENABLE);
   }
 
+  public boolean isStreamingWriteEnabled() {
+    return getBoolean(STREAMING_WRITE_ENABLED);
+  }
+
   public boolean isBloomFilterIndexEnabled() {
     return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
   }
@@ -704,6 +717,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder withStreamingWriteEnabled(boolean enabled) {
+      metadataConfig.setValue(STREAMING_WRITE_ENABLED, 
String.valueOf(enabled));
+      return this;
+    }
+
     public Builder withMetadataIndexBloomFilter(boolean enable) {
       metadataConfig.setValue(ENABLE_METADATA_INDEX_BLOOM_FILTER, 
String.valueOf(enable));
       return this;
@@ -942,6 +960,7 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_COLUMN_STATS, 
getDefaultColStatsEnable(engineType));
       metadataConfig.setDefaultValue(ENABLE_METADATA_INDEX_PARTITION_STATS, 
metadataConfig.isColumnStatsIndexEnabled());
       metadataConfig.setDefaultValue(SECONDARY_INDEX_ENABLE_PROP, 
getDefaultSecondaryIndexEnable(engineType));
+      metadataConfig.setDefaultValue(STREAMING_WRITE_ENABLED, 
getDefaultForStreamingWriteEnabled(engineType));
       // fix me: disable when schema on read is enabled.
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
       return metadataConfig;
@@ -959,6 +978,18 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       }
     }
 
+    private boolean getDefaultForStreamingWriteEnabled(EngineType engineType) {
+      switch (engineType) {
+        case SPARK:
+          return true;
+        case FLINK:
+        case JAVA:
+          return false;
+        default:
+          throw new HoodieNotSupportedException("Unsupported engine " + 
engineType);
+      }
+    }
+
     private boolean getDefaultColStatsEnable(EngineType engineType) {
       switch (engineType) {
         case SPARK:
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
index 316a662cee8..8026fd24385 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/functional/TestColStatsRecordWithMetadataRecord.java
@@ -31,6 +31,7 @@ import 
org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.testutils.InProcessTimeGenerator;
 import org.apache.hudi.common.util.FileFormatUtils;
 import org.apache.hudi.common.util.Functions;
@@ -193,7 +194,7 @@ public class TestColStatsRecordWithMetadataRecord extends 
HoodieSparkClientTestH
       }
     });
 
-    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(cfg, 
HoodieFailedWritesCleaningPolicy.EAGER);
+    HoodieWriteConfig mdtWriteConfig = 
HoodieMetadataWriteUtils.createMetadataWriteConfig(cfg, 
HoodieFailedWritesCleaningPolicy.EAGER, HoodieTableVersion.EIGHT);
     HoodieTableMetaClient mdtMetaClient = 
HoodieTableMetaClient.builder().setBasePath(mdtWriteConfig.getBasePath()).setConf(context.getStorageConf().newInstance()).build();
 
     HoodieTable table = HoodieSparkTable.create(mdtWriteConfig, context, 
mdtMetaClient);

Reply via email to