This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 14bde81ed0da perf: Fixing streaming dag performance wrt dt write 
status and mdt write status union (#13976)
14bde81ed0da is described below

commit 14bde81ed0da8c4a54fa3051530d7018693c95c3
Author: Sivabalan Narayanan <n.siv...@gmail.com>
AuthorDate: Wed Oct 1 08:30:17 2025 -0700

    perf: Fixing streaming dag performance wrt dt write status and mdt write 
status union (#13976)
---
 .../hudi/client/BaseHoodieTableServiceClient.java  |   8 +-
 .../apache/hudi/client/CoalescingPartitioner.java  |  47 ++++++
 .../hudi/client/SparkRDDTableServiceClient.java    |  35 +++-
 .../apache/hudi/client/SparkRDDWriteClient.java    |   5 +-
 .../hudi/client/StreamingMetadataWriteHandler.java |  44 +++--
 .../java/org/apache/hudi/data/HoodieJavaRDD.java   |   5 +
 .../hudi/client/TestCoalescingPartitioner.java     | 187 +++++++++++++++++++++
 .../hudi/client/TestSparkRDDWriteClient.java       |  88 ++++++++++
 .../client/TestStreamingMetadataWriteHandler.java  | 110 ++++++++++++
 .../org/apache/hudi/data/TestHoodieJavaRDD.java    |  25 +++
 .../hudi/common/config/HoodieMetadataConfig.java   |  13 ++
 .../org/apache/hudi/common/data/HoodieData.java    |   9 +
 .../apache/hudi/common/data/HoodieListData.java    |   6 +
 .../common/config/TestHoodieMetadataConfig.java    |  23 +++
 14 files changed, 587 insertions(+), 18 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
index cd06a875d1a6..f31d9358883b 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java
@@ -235,7 +235,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     logCompactionTimer = metrics.getLogCompactionCtx();
     WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
     HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime);
+    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, 
WriteOperationType.LOG_COMPACT);
     HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
     if (shouldComplete) {
       commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));
@@ -318,7 +318,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     }
     compactionTimer = metrics.getCompactionCtx();
     HoodieWriteMetadata<T> writeMetadata = table.compact(context, 
compactionInstantTime);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime);
+    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime, 
WriteOperationType.COMPACT);
     HoodieWriteMetadata<O> compactionWriteMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
     if (shouldComplete) {
       commitCompaction(compactionInstantTime, compactionWriteMetadata, 
Option.of(table));
@@ -331,7 +331,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
    *
    * @return The passed in {@code HoodieWriteMetadata} with probable partially 
updated write statuses.
    */
-  protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable 
table, HoodieWriteMetadata<T> writeMetadata, String instantTime) {
+  protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable 
table, HoodieWriteMetadata<T> writeMetadata, String instantTime, 
WriteOperationType writeOperationType) {
     return writeMetadata;
   }
 
@@ -487,7 +487,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> 
extends BaseHoodieCl
     clusteringTimer = metrics.getClusteringCtx();
     LOG.info("Starting clustering at {} for table {}", clusteringInstant, 
table.getConfig().getBasePath());
     HoodieWriteMetadata<T> writeMetadata = table.cluster(context, 
clusteringInstant);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, clusteringInstant);
+    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, clusteringInstant, 
WriteOperationType.CLUSTER);
     HoodieWriteMetadata<O> clusteringMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
 
     // TODO : Where is shouldComplete used ?
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
new file mode 100644
index 000000000000..6a449ca39c95
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.spark.Partitioner;
+
+/**
+ * Partitioner to route all records to just 1 partition.
+ */
+public class CoalescingPartitioner extends Partitioner {
+
+  private final int numPartitions;
+
+  CoalescingPartitioner(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  @Override
+  public int numPartitions() {
+    return numPartitions;
+  }
+
+  @Override
+  public int getPartition(Object key) {
+    if (numPartitions == 1) {
+      return 0;
+    } else {
+      return Math.abs(key.hashCode()) % numPartitions;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
index a5e80f3fc755..77fe332c637f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java
@@ -18,6 +18,7 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.utils.SparkReleaseResources;
 import org.apache.hudi.client.utils.SparkValidatorUtils;
@@ -26,10 +27,16 @@ import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.exception.HoodieClusteringException;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.metadata.HoodieTableMetadata;
 import org.apache.hudi.storage.StorageConfiguration;
 import org.apache.hudi.table.HoodieSparkTable;
@@ -42,13 +49,24 @@ import java.util.Collections;
 import java.util.List;
 import java.util.stream.Collectors;
 
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
+
 public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, 
HoodieData<WriteStatus>, JavaRDD<WriteStatus>> {
 
-  private final StreamingMetadataWriteHandler streamingMetadataWriteHandler = 
new StreamingMetadataWriteHandler();
+  private final StreamingMetadataWriteHandler streamingMetadataWriteHandler;
   protected SparkRDDTableServiceClient(HoodieEngineContext context,
                                        HoodieWriteConfig clientConfig,
                                        Option<EmbeddedTimelineService> 
timelineService) {
+    this(context, clientConfig, timelineService, new 
StreamingMetadataWriteHandler());
+  }
+
+  @VisibleForTesting
+  public SparkRDDTableServiceClient(HoodieEngineContext context,
+                                       HoodieWriteConfig clientConfig,
+                                       Option<EmbeddedTimelineService> 
timelineService,
+                                       StreamingMetadataWriteHandler 
streamingMetadataWriteHandler) {
     super(context, clientConfig, timelineService);
+    this.streamingMetadataWriteHandler = streamingMetadataWriteHandler;
   }
 
   @Override
@@ -80,9 +98,20 @@ public class SparkRDDTableServiceClient<T> extends 
BaseHoodieTableServiceClient<
   protected HoodieWriteMetadata<HoodieData<WriteStatus>> 
partialUpdateTableMetadata(
       HoodieTable table,
       HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata,
-      String instantTime) {
+      String instantTime,
+      WriteOperationType writeOperationType) {
     if (isStreamingWriteToMetadataEnabled(table)) {
-      
writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
 writeMetadata.getWriteStatuses(), instantTime));
+      boolean enforceCoalesceWithRepartition = writeOperationType == 
WriteOperationType.CLUSTER && config.getBulkInsertSortMode() == 
BulkInsertSortMode.NONE;
+      if (enforceCoalesceWithRepartition) {
+        // check clustering plan for sort columns. only if there are no sort 
columns, then we might still set enforceCoalesceWithRepartition to true.
+        HoodieClusteringPlan clusteringPlan = 
ClusteringUtils.getClusteringPlan(
+                table.getMetaClient(), 
ClusteringUtils.getRequestedClusteringInstant(instantTime, 
table.getActiveTimeline(), table.getInstantGenerator()).get())
+            .map(Pair::getRight).orElseThrow(() -> new 
HoodieClusteringException(
+                "Unable to read clustering plan for instant: " + instantTime));
+        enforceCoalesceWithRepartition = 
!clusteringPlan.getStrategy().getStrategyParams().containsKey(PLAN_STRATEGY_SORT_COLUMNS.key());
+      }
+      
writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
 writeMetadata.getWriteStatuses(), instantTime,
+          enforceCoalesceWithRepartition, 
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
     }
     return writeMetadata;
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 678697521f1c..94b6e408a2f9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -19,6 +19,7 @@
 package org.apache.hudi.client;
 
 import org.apache.hudi.callback.common.WriteStatusValidator;
+import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.index.HoodieSparkIndexClient;
 import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
@@ -108,7 +109,9 @@ public class SparkRDDWriteClient<T> extends
     final JavaRDD<WriteStatus> writeStatuses;
     if 
(WriteOperationType.streamingWritesToMetadataSupported((getOperationType())) && 
isStreamingWriteToMetadataEnabled(table)) {
       // this code block is expected to create a new Metadata Writer, start a 
new commit in metadata table and trigger streaming write to metadata table.
-      writeStatuses = 
HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
 HoodieJavaRDD.of(rawWriteStatuses), instantTime));
+      boolean enforceCoalesceWithRepartition = getOperationType() == 
WriteOperationType.BULK_INSERT && config.getBulkInsertSortMode() == 
BulkInsertSortMode.NONE;
+      writeStatuses = 
HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table,
 HoodieJavaRDD.of(rawWriteStatuses), instantTime,
+          enforceCoalesceWithRepartition, 
config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites()));
     } else {
       writeStatuses = rawWriteStatuses;
     }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
index effd7b9f1b18..93cf377cb6be 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java
@@ -24,14 +24,21 @@ import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.common.util.VisibleForTesting;
+import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.metadata.HoodieTableMetadataWriter;
 import org.apache.hudi.table.HoodieTable;
 
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import scala.Tuple2;
+
 /**
  * Class to assist with streaming writes to metadata table.
  */
@@ -47,21 +54,25 @@ public class StreamingMetadataWriteHandler {
    * @param table                  The {@link HoodieTable} instance for data 
table of interest.
    * @param dataTableWriteStatuses The {@link WriteStatus} from data table 
writes.
    * @param instantTime            The instant time of interest.
-   *
+   * @param enforceCoalesceWithRepartition true when repartition has to be 
added to dag to coalesce data table write statuses to 1. false otherwise.
+   * @param coalesceDivisorForDataTableWrites assist with determining the 
coalesce parallelism for data table write statuses. N data table write status
+   *                                          spark partitions will be divied 
by this value to find the coalesce parallelism.
    * @return {@link HoodieData} of {@link WriteStatus} referring to both data 
table writes and partial metadata table writes.
    */
-  public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, 
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) {
+  public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, 
HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
+                                                           boolean 
enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
     Option<HoodieTableMetadataWriter> metadataWriterOpt = 
getMetadataWriter(instantTime, table);
     ValidationUtils.checkState(metadataWriterOpt.isPresent(),
         "Cannot instantiate metadata writer for the table of interest " + 
table.getMetaClient().getBasePath());
-    return streamWriteToMetadataTable(dataTableWriteStatuses, 
metadataWriterOpt.get(), table, instantTime);
+    return streamWriteToMetadataTable(dataTableWriteStatuses, 
metadataWriterOpt.get(), table, instantTime, enforceCoalesceWithRepartition,
+        coalesceDivisorForDataTableWrites);
   }
 
   /**
    * To be invoked by write client or table service client to complete the 
write to metadata table.
    *
    * <p>When streaming writes is enabled, writes to left over metadata 
partitions
-   * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, 
HoodieData, String)},
+   * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, 
HoodieData, String, Boolean, Integer)},
    * otherwise writes to metadata table in legacy way(batch update without 
partial updates).
    *
    * @param table       The {@link HoodieTable} instance for data table of 
interest.
@@ -87,12 +98,24 @@ public class StreamingMetadataWriteHandler {
   private HoodieData<WriteStatus> 
streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses,
                                                              
HoodieTableMetadataWriter metadataWriter,
                                                              HoodieTable table,
-                                                             String 
instantTime) {
-    HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses;
+                                                             String 
instantTime,
+                                                             boolean 
enforceCoalesceWithRepartition,
+                                                             int 
coalesceDivisorForDataTableWrites) {
     HoodieData<WriteStatus> mdtWriteStatuses = 
metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, 
instantTime);
-    allWriteStatus = allWriteStatus.union(mdtWriteStatuses);
-    allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
-    return allWriteStatus;
+    mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), 
HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(),
 instantTime));
+    HoodieData<WriteStatus> coalescedDataWriteStatuses;
+    int coalesceParallelism = Math.max(1, 
dataTableWriteStatuses.getNumPartitions() / coalesceDivisorForDataTableWrites);
+    if (enforceCoalesceWithRepartition) {
+      // with bulk insert and NONE sort mode, simple coalesce on datatable 
write statuses also impact record key generation stages.
+      // and hence we are adding a partitioner to cut the chain so that 
coalesce(1) here does not impact record key generation stages.
+      coalescedDataWriteStatuses = 
HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses)
+          .mapToPair((PairFunction<WriteStatus, String, WriteStatus>) 
writeStatus -> new Tuple2(writeStatus.getStat().getPath(), writeStatus))
+          .partitionBy(new CoalescingPartitioner(coalesceParallelism))
+          .map((Function<Tuple2<String, WriteStatus>, WriteStatus>) entry -> 
entry._2));
+    } else {
+      coalescedDataWriteStatuses = 
dataTableWriteStatuses.coalesce(coalesceParallelism);
+    }
+    return coalescedDataWriteStatuses.union(mdtWriteStatuses);
   }
 
   /**
@@ -103,7 +126,8 @@ public class StreamingMetadataWriteHandler {
    *
    * @return The metadata writer option.
    */
-  private synchronized Option<HoodieTableMetadataWriter> 
getMetadataWriter(String triggeringInstant, HoodieTable table) {
+  @VisibleForTesting
+  synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstant, HoodieTable table) {
 
     if 
(!table.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT))
 {
       return Option.empty();
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
index 193c7c955da8..5a4043f659fe 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java
@@ -209,4 +209,9 @@ public class HoodieJavaRDD<T> implements HoodieData<T> {
   public HoodieData<T> repartition(int parallelism) {
     return HoodieJavaRDD.of(rddData.repartition(parallelism));
   }
+
+  @Override
+  public HoodieData<T> coalesce(int parallelism) {
+    return HoodieJavaRDD.of(rddData.coalesce(parallelism));
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
new file mode 100644
index 000000000000..0f3b972574ce
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.api.java.function.PairFunction;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
+
+import scala.Tuple2;
+
+import static 
org.apache.hudi.client.TestCoalescingPartitioner.FlatMapFunc.getWriteStatusForPartition;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestCoalescingPartitioner extends HoodieClientTestBase {
+
+  @Test
+  public void simpleCoalescingPartitionerTest() {
+    int numPartitions = 100;
+    HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), 
numPartitions));
+
+    // 100 keys spread across 10 partitions.
+    CoalescingPartitioner coalescingPartitioner = new 
CoalescingPartitioner(10);
+    assertEquals(10, coalescingPartitioner.numPartitions());
+    rddData.collectAsList().forEach(entry -> {
+      assertEquals(entry.hashCode() % 10, 
coalescingPartitioner.getPartition(entry));
+    });
+
+    // 1 partition
+    CoalescingPartitioner coalescingPartitioner1 = new 
CoalescingPartitioner(1);
+    assertEquals(1, coalescingPartitioner1.numPartitions());
+    rddData.collectAsList().forEach(entry -> {
+      assertEquals(0, coalescingPartitioner1.getPartition(entry));
+    });
+
+    // empty rdd
+    rddData = HoodieJavaRDD.of(jsc.emptyRDD());
+    CoalescingPartitioner coalescingPartitioner2 = new 
CoalescingPartitioner(1);
+    assertEquals(1, coalescingPartitioner2.numPartitions());
+    rddData.collectAsList().forEach(entry -> {
+      // since there is only one partition, any getPartition will return just 
the same partition index
+      assertEquals(0, coalescingPartitioner2.getPartition(entry));
+    });
+  }
+
+  private static Stream<Arguments> coalesceTestArgs() {
+    return Arrays.stream(new Object[][] {
+        {100, 1},
+        {1, 1},
+        {1000, 10},
+        {100, 7},
+        {1200, 50},
+        {1000, 23},
+        {10, 2}
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("coalesceTestArgs")
+  public void testCoalescingPartitionerWithRDD(int inputNumPartitions, int 
targetPartitions) {
+    int totalHudiPartitions = Math.max(1, inputNumPartitions / 
targetPartitions);
+    String partitionPathPrefix = "pPath";
+    List<String> partitionPaths = IntStream.rangeClosed(1, 
totalHudiPartitions).boxed().map(integer -> partitionPathPrefix + "_" + 
integer).collect(Collectors.toList());
+    List<WriteStatus> writeStatuses = new 
ArrayList<>(jsc.parallelize(partitionPaths, partitionPaths.size()).flatMap(new 
FlatMapFunc(targetPartitions != 1
+        ? inputNumPartitions / totalHudiPartitions : 
targetPartitions)).collect());
+
+    // for pending files, add to last partition.
+    if (targetPartitions != 1 && inputNumPartitions - writeStatuses.size() > 
0) {
+      writeStatuses.addAll(getWriteStatusForPartition("/tmp/", 
partitionPathPrefix + "_" + (totalHudiPartitions - 1), inputNumPartitions - 
writeStatuses.size()));
+    }
+
+    assertEquals(writeStatuses.size(), inputNumPartitions);
+
+    JavaRDD<WriteStatus> data = jsc.parallelize(writeStatuses, 
inputNumPartitions);
+    JavaRDD<WriteStatus> coalescedData = data.mapToPair(new 
PairFunc()).partitionBy(new CoalescingPartitioner(targetPartitions)).map(new 
MapFunc());
+    coalescedData.cache();
+
+    List<Pair<Integer, Integer>> countsPerPartition = 
coalescedData.mapPartitionsWithIndex((partitionIndex, rows) -> {
+      int count = 0;
+      while (rows.hasNext()) {
+        rows.next();
+        count++;
+      }
+      return Collections.singletonList(Pair.of(partitionIndex, 
count)).iterator();
+    }, true).collect();
+
+    assertEquals(targetPartitions, countsPerPartition.size());
+    // lets validate that atleast we have 50% of data in each spark partition 
compared to ideal scenario (we can't assume hash of strings will evenly 
distribute).
+    countsPerPartition.forEach(pair -> {
+      int numElements = pair.getValue();
+      int idealExpectedCount = inputNumPartitions / targetPartitions;
+      assertTrue(numElements > idealExpectedCount * 0.5);
+    });
+    assertEquals(targetPartitions, coalescedData.getNumPartitions());
+    List<WriteStatus> result = new ArrayList<>(coalescedData.collect());
+    // lets validate all paths from input are present in output as well.
+    List<String> expectedInputPaths = writeStatuses.stream().map(writeStatus 
-> writeStatus.getStat().getPath()).collect(Collectors.toList());
+    List<String> actualPaths = result.stream().map(writeStatus -> 
writeStatus.getStat().getPath()).collect(Collectors.toList());
+    Collections.sort(expectedInputPaths);
+    Collections.sort(actualPaths);
+    assertEquals(expectedInputPaths, actualPaths);
+    coalescedData.unpersist();
+  }
+
+  static class FlatMapFunc implements FlatMapFunction<String, WriteStatus> {
+
+    private int numWriteStatuses;
+
+    FlatMapFunc(int numWriteStatuses) {
+      this.numWriteStatuses = numWriteStatuses;
+    }
+
+    @Override
+    public Iterator<WriteStatus> call(String s) throws Exception {
+      return getWriteStatusForPartition("/tmp", s, 
numWriteStatuses).iterator();
+    }
+
+    static List<WriteStatus> getWriteStatusForPartition(String basePath, 
String partititionPath, int numWriteStatuses) {
+      String randomPrefix = UUID.randomUUID().toString() + "_";
+      List<WriteStatus> writeStatuses = new ArrayList<>();
+      for (int i = 0; i < numWriteStatuses; i++) {
+        String fileName = randomPrefix + i;
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setPartitionPath(partititionPath);
+        String fullFilePath = basePath + "/" + partititionPath + "/" + 
fileName;
+        writeStat.setPath(fullFilePath);
+        WriteStatus writeStatus = new WriteStatus();
+        writeStatus.setStat(writeStat);
+        writeStatuses.add(writeStatus);
+      }
+      return writeStatuses;
+    }
+  }
+
+  static class PairFunc implements PairFunction<WriteStatus, String, 
WriteStatus> {
+    @Override
+    public Tuple2<String, WriteStatus> call(WriteStatus writeStatus) throws 
Exception {
+      return Tuple2.apply(writeStatus.getStat().getPath(), writeStatus);
+    }
+  }
+
+  static class MapFunc implements Function<Tuple2<String, WriteStatus>, 
WriteStatus> {
+
+    @Override
+    public WriteStatus call(Tuple2<String, WriteStatus> booleanIntegerTuple2) 
throws Exception {
+      return booleanIntegerTuple2._2;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
index 238bdabc4003..2eb4f852f020 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java
@@ -19,20 +19,29 @@
 
 package org.apache.hudi.client;
 
+import org.apache.hudi.avro.model.HoodieClusteringPlan;
+import org.apache.hudi.avro.model.HoodieClusteringStrategy;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.InstantComparison;
 import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.util.ClusteringUtils;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
 import org.apache.hudi.data.HoodieJavaRDD;
 import org.apache.hudi.metadata.HoodieTableMetadata;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
 import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
 
 import org.apache.avro.generic.GenericRecord;
@@ -42,20 +51,29 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.CsvSource;
 import org.junit.jupiter.params.provider.MethodSource;
+import org.mockito.MockedStatic;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static 
org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC;
+import static 
org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS;
 import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.when;
 
 class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness {
 
@@ -205,6 +223,62 @@ class TestSparkRDDWriteClient extends 
SparkClientFunctionalTestHarness {
     testAndAssertCompletionIsEarlierThanRequested(basePath, props);
   }
 
+  private static Stream<Arguments> streamingMetadataWritesTestArgs() {
+    return Arrays.stream(new Object[][] {
+        {true, "COMPACT", "NONE", false, false},
+        {true, "COMPACT", "NONE", true, false},
+        {true, "COMPACT", "GLOBAL_SORT", true, false},
+        {true, "COMPACT", "GLOBAL_SORT", false, false},
+        {true, "LOG_COMPACT", "NONE", true, false},
+        {true, "LOG_COMPACT", "NONE", false, false},
+        {true, "LOG_COMPACT", "GLOBAL_SORT", true, false},
+        {true, "LOG_COMPACT", "GLOBAL_SORT", false, false},
+        {true, "CLUSTER", "NONE", true, false},
+        {true, "CLUSTER", "NONE", false, true},
+        {true, "CLUSTER", "GLOBAL_SORT", true, false},
+        {true, "CLUSTER", "GLOBAL_SORT", false, false},
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("streamingMetadataWritesTestArgs")
+  public void testStreamingMetadataWrites(boolean streamingWritesEnable, 
WriteOperationType writeOperationType,
+                                          String bulkInsertSortMode, boolean 
setSortColsinClusteringPlan,
+                                          boolean 
expectedEnforceRepartitionWithCoalesce) throws IOException {
+    HoodieTableMetaClient metaClient =
+        getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), 
new Properties());
+    HoodieWriteConfig writeConfig = getConfigBuilder(true)
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withStreamingWriteEnabled(streamingWritesEnable).build())
+        .withBulkInsertSortMode(bulkInsertSortMode)
+        .withPath(metaClient.getBasePath())
+        .build();
+    MockStreamingMetadataWriteHandler mockMetadataWriteHandler = new 
MockStreamingMetadataWriteHandler();
+
+    try (MockedStatic<ClusteringUtils> mocked = 
mockStatic(ClusteringUtils.class);) {
+      HoodieClusteringPlan clusteringPlan = mock(HoodieClusteringPlan.class);
+      HoodieClusteringStrategy clusteringStrategy = 
mock(HoodieClusteringStrategy.class);
+      when(clusteringPlan.getStrategy()).thenReturn(clusteringStrategy);
+      Map<String, String> strategyParams = new HashMap<>();
+      if (setSortColsinClusteringPlan) {
+        strategyParams.put(PLAN_STRATEGY_SORT_COLUMNS.key(), "abc");
+      }
+      when(clusteringStrategy.getStrategyParams()).thenReturn(strategyParams);
+
+      HoodieInstant hoodieInstant = mock(HoodieInstant.class);
+      mocked.when(() -> ClusteringUtils.getClusteringPlan(any(), 
any())).thenReturn(Option.of(Pair.of(hoodieInstant, clusteringPlan)));
+      mocked.when(() -> ClusteringUtils.getRequestedClusteringInstant(any(), 
any(), any())).thenReturn(Option.of(hoodieInstant));
+
+      SparkRDDTableServiceClient tableServiceClient = new 
SparkRDDTableServiceClient(context(), writeConfig, Option.empty(), 
mockMetadataWriteHandler);
+      HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = 
mock(HoodieWriteMetadata.class);
+      HoodieData<WriteStatus> hoodieData = mock(HoodieData.class);
+      when(writeMetadata.getWriteStatuses()).thenReturn(hoodieData);
+      HoodieTable table = mock(HoodieTable.class);
+      when(table.getMetaClient()).thenReturn(metaClient);
+      tableServiceClient.partialUpdateTableMetadata(table, writeMetadata, 
"00001", writeOperationType);
+      assertEquals(expectedEnforceRepartitionWithCoalesce, 
mockMetadataWriteHandler.enforceCoalesceWithRepartition);
+    }
+  }
+
   private void testAndAssertCompletionIsEarlierThanRequested(String basePath, 
Properties properties) throws IOException {
     HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), 
basePath, properties);
 
@@ -225,4 +299,18 @@ class TestSparkRDDWriteClient extends 
SparkClientFunctionalTestHarness {
     });
   }
 
+  class MockStreamingMetadataWriteHandler extends 
StreamingMetadataWriteHandler {
+
+    boolean enforceCoalesceWithRepartition;
+    int coalesceDivisorForDataTableWrites;
+
+    @Override
+    public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable 
table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime,
+                                                              boolean 
enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) {
+      this.enforceCoalesceWithRepartition = enforceCoalesceWithRepartition;
+      this.coalesceDivisorForDataTableWrites = 
coalesceDivisorForDataTableWrites;
+      return dataTableWriteStatuses;
+    }
+  }
+
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
new file mode 100644
index 000000000000..39ac8bdba74a
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.client;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.data.HoodieJavaRDD;
+import org.apache.hudi.metadata.HoodieTableMetadataWriter;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestStreamingMetadataWriteHandler extends 
SparkClientFunctionalTestHarness {
+
+  private final HoodieTable<?, ?, ?, ?> mockHoodieTable = 
mock(HoodieTable.class);
+  private HoodieTableMetaClient metaClient;
+
+  @BeforeEach
+  void setUp() {
+    metaClient = mock(HoodieTableMetaClient.class);
+    when(metaClient.getBasePath()).thenReturn(new StoragePath("/tmp/"));
+    when(mockHoodieTable.getMetaClient()).thenReturn(metaClient);
+    HoodieEngineContext engineContext = mock(HoodieEngineContext.class);
+    when(mockHoodieTable.getContext()).thenReturn(engineContext);
+  }
+
+  private static Stream<Arguments> coalesceDivisorTestArgs() {
+    return Arrays.stream(new Object[][] {
+        {100, 20, 1000, true},
+        {100, 20, 1000, false},
+        {1, 1, 1000, true},
+        {1, 1, 1000, false},
+        {10000, 100, 5000, true},
+        {10000, 100, 5000, true},
+        {10000, 100, 20000, true},
+        {10000, 100, 20000, true}
+    }).map(Arguments::of);
+  }
+
+  @ParameterizedTest
+  @MethodSource("coalesceDivisorTestArgs")
+  public void testCoalesceDividentConfig(int numDataTableWriteStatuses, int 
numMdtWriteStatus, int coalesceDividentForDataTableWrites,
+                                         boolean 
enforceCoalesceWithRepartition) {
+    HoodieData<WriteStatus> dataTableWriteStatus = 
mockWriteStatuses(numDataTableWriteStatuses);
+    HoodieData<WriteStatus> mdtWriteStatus = 
mockWriteStatuses(numMdtWriteStatus);
+    HoodieTableMetadataWriter mdtWriter = 
mock(HoodieTableMetadataWriter.class);
+    when(mdtWriter.streamWriteToMetadataPartitions(any(), 
any())).thenReturn(mdtWriteStatus);
+    StreamingMetadataWriteHandler metadataWriteHandler = new 
MockStreamingMetadataWriteHandler(mdtWriter);
+
+    HoodieData<WriteStatus> allWriteStatuses = 
metadataWriteHandler.streamWriteToMetadataTable(mockHoodieTable, 
dataTableWriteStatus, "00001", enforceCoalesceWithRepartition,
+        coalesceDividentForDataTableWrites);
+    assertEquals(Math.max(1, numDataTableWriteStatuses / 
coalesceDividentForDataTableWrites) + numMdtWriteStatus, 
allWriteStatuses.getNumPartitions());
+  }
+
+  private HoodieData<WriteStatus> mockWriteStatuses(int size) {
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+    for (int i = 0; i < size; i++) {
+      writeStatuses.add(mock(WriteStatus.class));
+    }
+    return HoodieJavaRDD.of(jsc().parallelize(writeStatuses, size));
+  }
+
+  class MockStreamingMetadataWriteHandler extends 
StreamingMetadataWriteHandler {
+
+    private HoodieTableMetadataWriter mdtWriter;
+
+    MockStreamingMetadataWriteHandler(HoodieTableMetadataWriter mdtWriter) {
+      this.mdtWriter = mdtWriter;
+    }
+
+    @Override
+    synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String 
triggeringInstant, HoodieTable table) {
+      return Option.of(mdtWriter);
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
index 2ba497264a5b..16380e18cdd5 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java
@@ -71,6 +71,31 @@ public class TestHoodieJavaRDD extends HoodieClientTestBase {
     assertEquals(11, shuffleRDD.deduceNumPartitions());
   }
 
+  @Test
+  public void testRepartitionAndCoalesce() {
+    int numPartitions = 100;
+    // rdd parallelize
+    HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize(
+        IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), 
numPartitions));
+    assertEquals(100, rddData.getNumPartitions());
+
+    // repartition by 10.
+    rddData = rddData.repartition(10);
+    assertEquals(10, rddData.getNumPartitions());
+
+    // coalesce to 5
+    rddData = rddData.coalesce(5);
+    assertEquals(5, rddData.getNumPartitions());
+
+    // repartition to 20
+    rddData = rddData.repartition(20);
+    assertEquals(20, rddData.getNumPartitions());
+
+    // but colesce may not expand the num partitions
+    rddData = rddData.coalesce(40);
+    assertEquals(20, rddData.getNumPartitions());
+  }
+
   @Test
   void testMapPartitionsWithCloseable() {
     String partition1 = "partition1";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 70b0365dd67e..96c95708ab75 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -81,6 +81,15 @@ public final class HoodieMetadataConfig extends HoodieConfig 
{
           + "in streaming manner rather than two disjoint writes. By default "
           + "streaming writes to metadata table is enabled for SPARK engine 
for incremental operations and disabled for all other cases.");
 
+  public static final ConfigProperty<Integer> 
STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR = ConfigProperty
+      .key(METADATA_PREFIX + 
".streaming.write.datatable.write.statuses.coalesce.divisor")
+      .defaultValue(5000)
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("When streaming writes to metadata table is enabled 
via hoodie.metadata.streaming.write.enabled, the data table write statuses are 
unioned "
+          + "with metadata table write statuses before triggering the entire 
write dag. The data table write statuses will be coalesce down to the number of 
write statuses "
+          + "divided by the specified divisor to avoid triggering thousands of 
no-op tasks for the data table writes which have their status cached.");
+
   public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true;
 
   // Enable metrics for internal Metadata Table
@@ -609,6 +618,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getBoolean(STREAMING_WRITE_ENABLED);
   }
 
+  public int getStreamingWritesCoalesceDivisorForDataTableWrites() {
+    return 
getInt(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR);
+  }
+
   public boolean isBloomFilterIndexEnabled() {
     return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER);
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
index a0b2ce75e967..cd95ee23a7e6 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java
@@ -220,6 +220,15 @@ public interface HoodieData<T> extends Serializable {
    */
   HoodieData<T> repartition(int parallelism);
 
+  /**
+   * Coalesces underlying collection (if applicable) making sure new {@link 
HoodieData} has
+   * exactly {@code parallelism} partitions or less.
+   *
+   * @param parallelism target number of partitions in the underlying 
collection
+   * @return {@link HoodieData<T>} holding coalesced collection
+   */
+  HoodieData<T> coalesce(int parallelism);
+
   default <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> 
keyGetter, int parallelism) {
     return mapToPair(i -> Pair.of(keyGetter.apply(i), i))
         .reduceByKey((value1, value2) -> value1, parallelism)
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java 
b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
index 7fd16af82382..6da031e1a5de 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java
@@ -197,6 +197,12 @@ public class HoodieListData<T> extends 
HoodieBaseListData<T> implements HoodieDa
     return this;
   }
 
+  @Override
+  public HoodieData<T> coalesce(int parallelism) {
+    // no op
+    return this;
+  }
+
   @Override
   public boolean isEmpty() {
     return super.isEmpty();
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
index eb97a4c4a345..73d705df907d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java
@@ -59,4 +59,27 @@ class TestHoodieMetadataConfig {
         .build();
     assertEquals(-50, 
configWithNegativeValue.getRecordPreparationParallelism());
   }
+
+  @Test
+  void testStreamingWritesCoalesceDivisorForDataTableWrites() {
+    // Test default value
+    HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build();
+    assertEquals(5000, 
config.getStreamingWritesCoalesceDivisorForDataTableWrites());
+
+    // Test custom value
+    Properties props = new Properties();
+    
props.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(),
 "1");
+    HoodieMetadataConfig configWithCustomValue = 
HoodieMetadataConfig.newBuilder()
+        .fromProperties(props)
+        .build();
+    assertEquals(1, 
configWithCustomValue.getStreamingWritesCoalesceDivisorForDataTableWrites());
+
+    Properties propsZero = new Properties();
+    
propsZero.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(),
 "10000");
+    HoodieMetadataConfig configWithZeroValue = 
HoodieMetadataConfig.newBuilder()
+        .fromProperties(propsZero)
+        .build();
+    assertEquals(10000, 
configWithZeroValue.getStreamingWritesCoalesceDivisorForDataTableWrites());
+  }
+
 }

Reply via email to