This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push: new 14bde81ed0da perf: Fixing streaming dag performance wrt dt write status and mdt write status union (#13976) 14bde81ed0da is described below commit 14bde81ed0da8c4a54fa3051530d7018693c95c3 Author: Sivabalan Narayanan <n.siv...@gmail.com> AuthorDate: Wed Oct 1 08:30:17 2025 -0700 perf: Fixing streaming dag performance wrt dt write status and mdt write status union (#13976) --- .../hudi/client/BaseHoodieTableServiceClient.java | 8 +- .../apache/hudi/client/CoalescingPartitioner.java | 47 ++++++ .../hudi/client/SparkRDDTableServiceClient.java | 35 +++- .../apache/hudi/client/SparkRDDWriteClient.java | 5 +- .../hudi/client/StreamingMetadataWriteHandler.java | 44 +++-- .../java/org/apache/hudi/data/HoodieJavaRDD.java | 5 + .../hudi/client/TestCoalescingPartitioner.java | 187 +++++++++++++++++++++ .../hudi/client/TestSparkRDDWriteClient.java | 88 ++++++++++ .../client/TestStreamingMetadataWriteHandler.java | 110 ++++++++++++ .../org/apache/hudi/data/TestHoodieJavaRDD.java | 25 +++ .../hudi/common/config/HoodieMetadataConfig.java | 13 ++ .../org/apache/hudi/common/data/HoodieData.java | 9 + .../apache/hudi/common/data/HoodieListData.java | 6 + .../common/config/TestHoodieMetadataConfig.java | 23 +++ 14 files changed, 587 insertions(+), 18 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java index cd06a875d1a6..f31d9358883b 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java @@ -235,7 +235,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl logCompactionTimer = metrics.getLogCompactionCtx(); WriteMarkersFactory.get(config.getMarkersType(), table, logCompactionInstantTime); HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, logCompactionInstantTime); - HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime); + HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, WriteOperationType.LOG_COMPACT); HoodieWriteMetadata<O> logCompactionMetadata = convertToOutputMetadata(updatedWriteMetadata); if (shouldComplete) { commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, Option.of(table)); @@ -318,7 +318,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl } compactionTimer = metrics.getCompactionCtx(); HoodieWriteMetadata<T> writeMetadata = table.compact(context, compactionInstantTime); - HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime); + HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, compactionInstantTime, WriteOperationType.COMPACT); HoodieWriteMetadata<O> compactionWriteMetadata = convertToOutputMetadata(updatedWriteMetadata); if (shouldComplete) { commitCompaction(compactionInstantTime, compactionWriteMetadata, Option.of(table)); @@ -331,7 +331,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl * * @return The passed in {@code HoodieWriteMetadata} with probable partially updated write statuses. */ - protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable table, HoodieWriteMetadata<T> writeMetadata, String instantTime) { + protected HoodieWriteMetadata<T> partialUpdateTableMetadata(HoodieTable table, HoodieWriteMetadata<T> writeMetadata, String instantTime, WriteOperationType writeOperationType) { return writeMetadata; } @@ -487,7 +487,7 @@ public abstract class BaseHoodieTableServiceClient<I, T, O> extends BaseHoodieCl clusteringTimer = metrics.getClusteringCtx(); LOG.info("Starting clustering at {} for table {}", clusteringInstant, table.getConfig().getBasePath()); HoodieWriteMetadata<T> writeMetadata = table.cluster(context, clusteringInstant); - HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, clusteringInstant); + HoodieWriteMetadata<T> updatedWriteMetadata = partialUpdateTableMetadata(table, writeMetadata, clusteringInstant, WriteOperationType.CLUSTER); HoodieWriteMetadata<O> clusteringMetadata = convertToOutputMetadata(updatedWriteMetadata); // TODO : Where is shouldComplete used ? diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java new file mode 100644 index 000000000000..6a449ca39c95 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/CoalescingPartitioner.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.spark.Partitioner; + +/** + * Partitioner to route all records to just 1 partition. + */ +public class CoalescingPartitioner extends Partitioner { + + private final int numPartitions; + + CoalescingPartitioner(int numPartitions) { + this.numPartitions = numPartitions; + } + + @Override + public int numPartitions() { + return numPartitions; + } + + @Override + public int getPartition(Object key) { + if (numPartitions == 1) { + return 0; + } else { + return Math.abs(key.hashCode()) % numPartitions; + } + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java index a5e80f3fc755..77fe332c637f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDTableServiceClient.java @@ -18,6 +18,7 @@ package org.apache.hudi.client; +import org.apache.hudi.avro.model.HoodieClusteringPlan; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.client.utils.SparkReleaseResources; import org.apache.hudi.client.utils.SparkValidatorUtils; @@ -26,10 +27,16 @@ import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.exception.HoodieClusteringException; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.metadata.HoodieTableMetadata; import org.apache.hudi.storage.StorageConfiguration; import org.apache.hudi.table.HoodieSparkTable; @@ -42,13 +49,24 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; + public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient<HoodieData<HoodieRecord<T>>, HoodieData<WriteStatus>, JavaRDD<WriteStatus>> { - private final StreamingMetadataWriteHandler streamingMetadataWriteHandler = new StreamingMetadataWriteHandler(); + private final StreamingMetadataWriteHandler streamingMetadataWriteHandler; protected SparkRDDTableServiceClient(HoodieEngineContext context, HoodieWriteConfig clientConfig, Option<EmbeddedTimelineService> timelineService) { + this(context, clientConfig, timelineService, new StreamingMetadataWriteHandler()); + } + + @VisibleForTesting + public SparkRDDTableServiceClient(HoodieEngineContext context, + HoodieWriteConfig clientConfig, + Option<EmbeddedTimelineService> timelineService, + StreamingMetadataWriteHandler streamingMetadataWriteHandler) { super(context, clientConfig, timelineService); + this.streamingMetadataWriteHandler = streamingMetadataWriteHandler; } @Override @@ -80,9 +98,20 @@ public class SparkRDDTableServiceClient<T> extends BaseHoodieTableServiceClient< protected HoodieWriteMetadata<HoodieData<WriteStatus>> partialUpdateTableMetadata( HoodieTable table, HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata, - String instantTime) { + String instantTime, + WriteOperationType writeOperationType) { if (isStreamingWriteToMetadataEnabled(table)) { - writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, writeMetadata.getWriteStatuses(), instantTime)); + boolean enforceCoalesceWithRepartition = writeOperationType == WriteOperationType.CLUSTER && config.getBulkInsertSortMode() == BulkInsertSortMode.NONE; + if (enforceCoalesceWithRepartition) { + // check clustering plan for sort columns. only if there are no sort columns, then we might still set enforceCoalesceWithRepartition to true. + HoodieClusteringPlan clusteringPlan = ClusteringUtils.getClusteringPlan( + table.getMetaClient(), ClusteringUtils.getRequestedClusteringInstant(instantTime, table.getActiveTimeline(), table.getInstantGenerator()).get()) + .map(Pair::getRight).orElseThrow(() -> new HoodieClusteringException( + "Unable to read clustering plan for instant: " + instantTime)); + enforceCoalesceWithRepartition = !clusteringPlan.getStrategy().getStrategyParams().containsKey(PLAN_STRATEGY_SORT_COLUMNS.key()); + } + writeMetadata.setWriteStatuses(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, writeMetadata.getWriteStatuses(), instantTime, + enforceCoalesceWithRepartition, config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites())); } return writeMetadata; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java index 678697521f1c..94b6e408a2f9 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java @@ -19,6 +19,7 @@ package org.apache.hudi.client; import org.apache.hudi.callback.common.WriteStatusValidator; +import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode; import org.apache.hudi.index.HoodieSparkIndexClient; import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.client.embedded.EmbeddedTimelineService; @@ -108,7 +109,9 @@ public class SparkRDDWriteClient<T> extends final JavaRDD<WriteStatus> writeStatuses; if (WriteOperationType.streamingWritesToMetadataSupported((getOperationType())) && isStreamingWriteToMetadataEnabled(table)) { // this code block is expected to create a new Metadata Writer, start a new commit in metadata table and trigger streaming write to metadata table. - writeStatuses = HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, HoodieJavaRDD.of(rawWriteStatuses), instantTime)); + boolean enforceCoalesceWithRepartition = getOperationType() == WriteOperationType.BULK_INSERT && config.getBulkInsertSortMode() == BulkInsertSortMode.NONE; + writeStatuses = HoodieJavaRDD.getJavaRDD(streamingMetadataWriteHandler.streamWriteToMetadataTable(table, HoodieJavaRDD.of(rawWriteStatuses), instantTime, + enforceCoalesceWithRepartition, config.getMetadataConfig().getStreamingWritesCoalesceDivisorForDataTableWrites())); } else { writeStatuses = rawWriteStatuses; } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java index effd7b9f1b18..93cf377cb6be 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/StreamingMetadataWriteHandler.java @@ -24,14 +24,21 @@ import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.table.HoodieTableVersion; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.VisibleForTesting; +import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.exception.HoodieException; import org.apache.hudi.metadata.HoodieTableMetadataWriter; import org.apache.hudi.table.HoodieTable; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; + import java.util.HashMap; import java.util.List; import java.util.Map; +import scala.Tuple2; + /** * Class to assist with streaming writes to metadata table. */ @@ -47,21 +54,25 @@ public class StreamingMetadataWriteHandler { * @param table The {@link HoodieTable} instance for data table of interest. * @param dataTableWriteStatuses The {@link WriteStatus} from data table writes. * @param instantTime The instant time of interest. - * + * @param enforceCoalesceWithRepartition true when repartition has to be added to dag to coalesce data table write statuses to 1. false otherwise. + * @param coalesceDivisorForDataTableWrites assist with determining the coalesce parallelism for data table write statuses. N data table write status + * spark partitions will be divied by this value to find the coalesce parallelism. * @return {@link HoodieData} of {@link WriteStatus} referring to both data table writes and partial metadata table writes. */ - public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime) { + public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime, + boolean enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) { Option<HoodieTableMetadataWriter> metadataWriterOpt = getMetadataWriter(instantTime, table); ValidationUtils.checkState(metadataWriterOpt.isPresent(), "Cannot instantiate metadata writer for the table of interest " + table.getMetaClient().getBasePath()); - return streamWriteToMetadataTable(dataTableWriteStatuses, metadataWriterOpt.get(), table, instantTime); + return streamWriteToMetadataTable(dataTableWriteStatuses, metadataWriterOpt.get(), table, instantTime, enforceCoalesceWithRepartition, + coalesceDivisorForDataTableWrites); } /** * To be invoked by write client or table service client to complete the write to metadata table. * * <p>When streaming writes is enabled, writes to left over metadata partitions - * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, HoodieData, String)}, + * which is not covered in {@link #streamWriteToMetadataTable(HoodieTable, HoodieData, String, Boolean, Integer)}, * otherwise writes to metadata table in legacy way(batch update without partial updates). * * @param table The {@link HoodieTable} instance for data table of interest. @@ -87,12 +98,24 @@ public class StreamingMetadataWriteHandler { private HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieData<WriteStatus> dataTableWriteStatuses, HoodieTableMetadataWriter metadataWriter, HoodieTable table, - String instantTime) { - HoodieData<WriteStatus> allWriteStatus = dataTableWriteStatuses; + String instantTime, + boolean enforceCoalesceWithRepartition, + int coalesceDivisorForDataTableWrites) { HoodieData<WriteStatus> mdtWriteStatuses = metadataWriter.streamWriteToMetadataPartitions(dataTableWriteStatuses, instantTime); - allWriteStatus = allWriteStatus.union(mdtWriteStatuses); - allWriteStatus.persist("MEMORY_AND_DISK_SER", table.getContext(), HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(), instantTime)); - return allWriteStatus; + mdtWriteStatuses.persist("MEMORY_AND_DISK_SER", table.getContext(), HoodieData.HoodieDataCacheKey.of(table.getMetaClient().getBasePath().toString(), instantTime)); + HoodieData<WriteStatus> coalescedDataWriteStatuses; + int coalesceParallelism = Math.max(1, dataTableWriteStatuses.getNumPartitions() / coalesceDivisorForDataTableWrites); + if (enforceCoalesceWithRepartition) { + // with bulk insert and NONE sort mode, simple coalesce on datatable write statuses also impact record key generation stages. + // and hence we are adding a partitioner to cut the chain so that coalesce(1) here does not impact record key generation stages. + coalescedDataWriteStatuses = HoodieJavaRDD.of(HoodieJavaRDD.getJavaRDD(dataTableWriteStatuses) + .mapToPair((PairFunction<WriteStatus, String, WriteStatus>) writeStatus -> new Tuple2(writeStatus.getStat().getPath(), writeStatus)) + .partitionBy(new CoalescingPartitioner(coalesceParallelism)) + .map((Function<Tuple2<String, WriteStatus>, WriteStatus>) entry -> entry._2)); + } else { + coalescedDataWriteStatuses = dataTableWriteStatuses.coalesce(coalesceParallelism); + } + return coalescedDataWriteStatuses.union(mdtWriteStatuses); } /** @@ -103,7 +126,8 @@ public class StreamingMetadataWriteHandler { * * @return The metadata writer option. */ - private synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstant, HoodieTable table) { + @VisibleForTesting + synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstant, HoodieTable table) { if (!table.getMetaClient().getTableConfig().getTableVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT)) { return Option.empty(); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java index 193c7c955da8..5a4043f659fe 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/data/HoodieJavaRDD.java @@ -209,4 +209,9 @@ public class HoodieJavaRDD<T> implements HoodieData<T> { public HoodieData<T> repartition(int parallelism) { return HoodieJavaRDD.of(rddData.repartition(parallelism)); } + + @Override + public HoodieData<T> coalesce(int parallelism) { + return HoodieJavaRDD.of(rddData.coalesce(parallelism)); + } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java new file mode 100644 index 000000000000..0f3b972574ce --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestCoalescingPartitioner.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.testutils.HoodieClientTestBase; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.api.java.function.PairFunction; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import scala.Tuple2; + +import static org.apache.hudi.client.TestCoalescingPartitioner.FlatMapFunc.getWriteStatusForPartition; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class TestCoalescingPartitioner extends HoodieClientTestBase { + + @Test + public void simpleCoalescingPartitionerTest() { + int numPartitions = 100; + HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize( + IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions)); + + // 100 keys spread across 10 partitions. + CoalescingPartitioner coalescingPartitioner = new CoalescingPartitioner(10); + assertEquals(10, coalescingPartitioner.numPartitions()); + rddData.collectAsList().forEach(entry -> { + assertEquals(entry.hashCode() % 10, coalescingPartitioner.getPartition(entry)); + }); + + // 1 partition + CoalescingPartitioner coalescingPartitioner1 = new CoalescingPartitioner(1); + assertEquals(1, coalescingPartitioner1.numPartitions()); + rddData.collectAsList().forEach(entry -> { + assertEquals(0, coalescingPartitioner1.getPartition(entry)); + }); + + // empty rdd + rddData = HoodieJavaRDD.of(jsc.emptyRDD()); + CoalescingPartitioner coalescingPartitioner2 = new CoalescingPartitioner(1); + assertEquals(1, coalescingPartitioner2.numPartitions()); + rddData.collectAsList().forEach(entry -> { + // since there is only one partition, any getPartition will return just the same partition index + assertEquals(0, coalescingPartitioner2.getPartition(entry)); + }); + } + + private static Stream<Arguments> coalesceTestArgs() { + return Arrays.stream(new Object[][] { + {100, 1}, + {1, 1}, + {1000, 10}, + {100, 7}, + {1200, 50}, + {1000, 23}, + {10, 2} + }).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("coalesceTestArgs") + public void testCoalescingPartitionerWithRDD(int inputNumPartitions, int targetPartitions) { + int totalHudiPartitions = Math.max(1, inputNumPartitions / targetPartitions); + String partitionPathPrefix = "pPath"; + List<String> partitionPaths = IntStream.rangeClosed(1, totalHudiPartitions).boxed().map(integer -> partitionPathPrefix + "_" + integer).collect(Collectors.toList()); + List<WriteStatus> writeStatuses = new ArrayList<>(jsc.parallelize(partitionPaths, partitionPaths.size()).flatMap(new FlatMapFunc(targetPartitions != 1 + ? inputNumPartitions / totalHudiPartitions : targetPartitions)).collect()); + + // for pending files, add to last partition. + if (targetPartitions != 1 && inputNumPartitions - writeStatuses.size() > 0) { + writeStatuses.addAll(getWriteStatusForPartition("/tmp/", partitionPathPrefix + "_" + (totalHudiPartitions - 1), inputNumPartitions - writeStatuses.size())); + } + + assertEquals(writeStatuses.size(), inputNumPartitions); + + JavaRDD<WriteStatus> data = jsc.parallelize(writeStatuses, inputNumPartitions); + JavaRDD<WriteStatus> coalescedData = data.mapToPair(new PairFunc()).partitionBy(new CoalescingPartitioner(targetPartitions)).map(new MapFunc()); + coalescedData.cache(); + + List<Pair<Integer, Integer>> countsPerPartition = coalescedData.mapPartitionsWithIndex((partitionIndex, rows) -> { + int count = 0; + while (rows.hasNext()) { + rows.next(); + count++; + } + return Collections.singletonList(Pair.of(partitionIndex, count)).iterator(); + }, true).collect(); + + assertEquals(targetPartitions, countsPerPartition.size()); + // lets validate that atleast we have 50% of data in each spark partition compared to ideal scenario (we can't assume hash of strings will evenly distribute). + countsPerPartition.forEach(pair -> { + int numElements = pair.getValue(); + int idealExpectedCount = inputNumPartitions / targetPartitions; + assertTrue(numElements > idealExpectedCount * 0.5); + }); + assertEquals(targetPartitions, coalescedData.getNumPartitions()); + List<WriteStatus> result = new ArrayList<>(coalescedData.collect()); + // lets validate all paths from input are present in output as well. + List<String> expectedInputPaths = writeStatuses.stream().map(writeStatus -> writeStatus.getStat().getPath()).collect(Collectors.toList()); + List<String> actualPaths = result.stream().map(writeStatus -> writeStatus.getStat().getPath()).collect(Collectors.toList()); + Collections.sort(expectedInputPaths); + Collections.sort(actualPaths); + assertEquals(expectedInputPaths, actualPaths); + coalescedData.unpersist(); + } + + static class FlatMapFunc implements FlatMapFunction<String, WriteStatus> { + + private int numWriteStatuses; + + FlatMapFunc(int numWriteStatuses) { + this.numWriteStatuses = numWriteStatuses; + } + + @Override + public Iterator<WriteStatus> call(String s) throws Exception { + return getWriteStatusForPartition("/tmp", s, numWriteStatuses).iterator(); + } + + static List<WriteStatus> getWriteStatusForPartition(String basePath, String partititionPath, int numWriteStatuses) { + String randomPrefix = UUID.randomUUID().toString() + "_"; + List<WriteStatus> writeStatuses = new ArrayList<>(); + for (int i = 0; i < numWriteStatuses; i++) { + String fileName = randomPrefix + i; + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setPartitionPath(partititionPath); + String fullFilePath = basePath + "/" + partititionPath + "/" + fileName; + writeStat.setPath(fullFilePath); + WriteStatus writeStatus = new WriteStatus(); + writeStatus.setStat(writeStat); + writeStatuses.add(writeStatus); + } + return writeStatuses; + } + } + + static class PairFunc implements PairFunction<WriteStatus, String, WriteStatus> { + @Override + public Tuple2<String, WriteStatus> call(WriteStatus writeStatus) throws Exception { + return Tuple2.apply(writeStatus.getStat().getPath(), writeStatus); + } + } + + static class MapFunc implements Function<Tuple2<String, WriteStatus>, WriteStatus> { + + @Override + public WriteStatus call(Tuple2<String, WriteStatus> booleanIntegerTuple2) throws Exception { + return booleanIntegerTuple2._2; + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java index 238bdabc4003..2eb4f852f020 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestSparkRDDWriteClient.java @@ -19,20 +19,29 @@ package org.apache.hudi.client; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieClusteringStrategy; import org.apache.hudi.client.embedded.EmbeddedTimelineService; import org.apache.hudi.common.config.HoodieMetadataConfig; +import org.apache.hudi.common.data.HoodieData; import org.apache.hudi.common.data.HoodieData.HoodieDataCacheKey; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.InstantComparison; import org.apache.hudi.common.table.view.FileSystemViewStorageConfig; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.data.HoodieJavaRDD; import org.apache.hudi.metadata.HoodieTableMetadata; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.action.HoodieWriteMetadata; import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; import org.apache.avro.generic.GenericRecord; @@ -42,20 +51,29 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; +import org.mockito.MockedStatic; import java.io.IOException; import java.net.URI; +import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Properties; import java.util.stream.Collectors; import java.util.stream.Stream; import static org.apache.hudi.common.testutils.HoodieTestDataGenerator.getCommitTimeAtUTC; +import static org.apache.hudi.config.HoodieClusteringConfig.PLAN_STRATEGY_SORT_COLUMNS; import static org.apache.hudi.testutils.Assertions.assertNoWriteErrors; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness { @@ -205,6 +223,62 @@ class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness { testAndAssertCompletionIsEarlierThanRequested(basePath, props); } + private static Stream<Arguments> streamingMetadataWritesTestArgs() { + return Arrays.stream(new Object[][] { + {true, "COMPACT", "NONE", false, false}, + {true, "COMPACT", "NONE", true, false}, + {true, "COMPACT", "GLOBAL_SORT", true, false}, + {true, "COMPACT", "GLOBAL_SORT", false, false}, + {true, "LOG_COMPACT", "NONE", true, false}, + {true, "LOG_COMPACT", "NONE", false, false}, + {true, "LOG_COMPACT", "GLOBAL_SORT", true, false}, + {true, "LOG_COMPACT", "GLOBAL_SORT", false, false}, + {true, "CLUSTER", "NONE", true, false}, + {true, "CLUSTER", "NONE", false, true}, + {true, "CLUSTER", "GLOBAL_SORT", true, false}, + {true, "CLUSTER", "GLOBAL_SORT", false, false}, + }).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("streamingMetadataWritesTestArgs") + public void testStreamingMetadataWrites(boolean streamingWritesEnable, WriteOperationType writeOperationType, + String bulkInsertSortMode, boolean setSortColsinClusteringPlan, + boolean expectedEnforceRepartitionWithCoalesce) throws IOException { + HoodieTableMetaClient metaClient = + getHoodieMetaClient(storageConf(), URI.create(basePath()).getPath(), new Properties()); + HoodieWriteConfig writeConfig = getConfigBuilder(true) + .withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true).withStreamingWriteEnabled(streamingWritesEnable).build()) + .withBulkInsertSortMode(bulkInsertSortMode) + .withPath(metaClient.getBasePath()) + .build(); + MockStreamingMetadataWriteHandler mockMetadataWriteHandler = new MockStreamingMetadataWriteHandler(); + + try (MockedStatic<ClusteringUtils> mocked = mockStatic(ClusteringUtils.class);) { + HoodieClusteringPlan clusteringPlan = mock(HoodieClusteringPlan.class); + HoodieClusteringStrategy clusteringStrategy = mock(HoodieClusteringStrategy.class); + when(clusteringPlan.getStrategy()).thenReturn(clusteringStrategy); + Map<String, String> strategyParams = new HashMap<>(); + if (setSortColsinClusteringPlan) { + strategyParams.put(PLAN_STRATEGY_SORT_COLUMNS.key(), "abc"); + } + when(clusteringStrategy.getStrategyParams()).thenReturn(strategyParams); + + HoodieInstant hoodieInstant = mock(HoodieInstant.class); + mocked.when(() -> ClusteringUtils.getClusteringPlan(any(), any())).thenReturn(Option.of(Pair.of(hoodieInstant, clusteringPlan))); + mocked.when(() -> ClusteringUtils.getRequestedClusteringInstant(any(), any(), any())).thenReturn(Option.of(hoodieInstant)); + + SparkRDDTableServiceClient tableServiceClient = new SparkRDDTableServiceClient(context(), writeConfig, Option.empty(), mockMetadataWriteHandler); + HoodieWriteMetadata<HoodieData<WriteStatus>> writeMetadata = mock(HoodieWriteMetadata.class); + HoodieData<WriteStatus> hoodieData = mock(HoodieData.class); + when(writeMetadata.getWriteStatuses()).thenReturn(hoodieData); + HoodieTable table = mock(HoodieTable.class); + when(table.getMetaClient()).thenReturn(metaClient); + tableServiceClient.partialUpdateTableMetadata(table, writeMetadata, "00001", writeOperationType); + assertEquals(expectedEnforceRepartitionWithCoalesce, mockMetadataWriteHandler.enforceCoalesceWithRepartition); + } + } + private void testAndAssertCompletionIsEarlierThanRequested(String basePath, Properties properties) throws IOException { HoodieTableMetaClient metaClient = getHoodieMetaClient(storageConf(), basePath, properties); @@ -225,4 +299,18 @@ class TestSparkRDDWriteClient extends SparkClientFunctionalTestHarness { }); } + class MockStreamingMetadataWriteHandler extends StreamingMetadataWriteHandler { + + boolean enforceCoalesceWithRepartition; + int coalesceDivisorForDataTableWrites; + + @Override + public HoodieData<WriteStatus> streamWriteToMetadataTable(HoodieTable table, HoodieData<WriteStatus> dataTableWriteStatuses, String instantTime, + boolean enforceCoalesceWithRepartition, int coalesceDivisorForDataTableWrites) { + this.enforceCoalesceWithRepartition = enforceCoalesceWithRepartition; + this.coalesceDivisorForDataTableWrites = coalesceDivisorForDataTableWrites; + return dataTableWriteStatuses; + } + } + } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java new file mode 100644 index 000000000000..39ac8bdba74a --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestStreamingMetadataWriteHandler.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.client; + +import org.apache.hudi.common.data.HoodieData; +import org.apache.hudi.common.engine.HoodieEngineContext; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.data.HoodieJavaRDD; +import org.apache.hudi.metadata.HoodieTableMetadataWriter; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.testutils.SparkClientFunctionalTestHarness; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Stream; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestStreamingMetadataWriteHandler extends SparkClientFunctionalTestHarness { + + private final HoodieTable<?, ?, ?, ?> mockHoodieTable = mock(HoodieTable.class); + private HoodieTableMetaClient metaClient; + + @BeforeEach + void setUp() { + metaClient = mock(HoodieTableMetaClient.class); + when(metaClient.getBasePath()).thenReturn(new StoragePath("/tmp/")); + when(mockHoodieTable.getMetaClient()).thenReturn(metaClient); + HoodieEngineContext engineContext = mock(HoodieEngineContext.class); + when(mockHoodieTable.getContext()).thenReturn(engineContext); + } + + private static Stream<Arguments> coalesceDivisorTestArgs() { + return Arrays.stream(new Object[][] { + {100, 20, 1000, true}, + {100, 20, 1000, false}, + {1, 1, 1000, true}, + {1, 1, 1000, false}, + {10000, 100, 5000, true}, + {10000, 100, 5000, true}, + {10000, 100, 20000, true}, + {10000, 100, 20000, true} + }).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("coalesceDivisorTestArgs") + public void testCoalesceDividentConfig(int numDataTableWriteStatuses, int numMdtWriteStatus, int coalesceDividentForDataTableWrites, + boolean enforceCoalesceWithRepartition) { + HoodieData<WriteStatus> dataTableWriteStatus = mockWriteStatuses(numDataTableWriteStatuses); + HoodieData<WriteStatus> mdtWriteStatus = mockWriteStatuses(numMdtWriteStatus); + HoodieTableMetadataWriter mdtWriter = mock(HoodieTableMetadataWriter.class); + when(mdtWriter.streamWriteToMetadataPartitions(any(), any())).thenReturn(mdtWriteStatus); + StreamingMetadataWriteHandler metadataWriteHandler = new MockStreamingMetadataWriteHandler(mdtWriter); + + HoodieData<WriteStatus> allWriteStatuses = metadataWriteHandler.streamWriteToMetadataTable(mockHoodieTable, dataTableWriteStatus, "00001", enforceCoalesceWithRepartition, + coalesceDividentForDataTableWrites); + assertEquals(Math.max(1, numDataTableWriteStatuses / coalesceDividentForDataTableWrites) + numMdtWriteStatus, allWriteStatuses.getNumPartitions()); + } + + private HoodieData<WriteStatus> mockWriteStatuses(int size) { + List<WriteStatus> writeStatuses = new ArrayList<>(); + for (int i = 0; i < size; i++) { + writeStatuses.add(mock(WriteStatus.class)); + } + return HoodieJavaRDD.of(jsc().parallelize(writeStatuses, size)); + } + + class MockStreamingMetadataWriteHandler extends StreamingMetadataWriteHandler { + + private HoodieTableMetadataWriter mdtWriter; + + MockStreamingMetadataWriteHandler(HoodieTableMetadataWriter mdtWriter) { + this.mdtWriter = mdtWriter; + } + + @Override + synchronized Option<HoodieTableMetadataWriter> getMetadataWriter(String triggeringInstant, HoodieTable table) { + return Option.of(mdtWriter); + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java index 2ba497264a5b..16380e18cdd5 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/data/TestHoodieJavaRDD.java @@ -71,6 +71,31 @@ public class TestHoodieJavaRDD extends HoodieClientTestBase { assertEquals(11, shuffleRDD.deduceNumPartitions()); } + @Test + public void testRepartitionAndCoalesce() { + int numPartitions = 100; + // rdd parallelize + HoodieData<Integer> rddData = HoodieJavaRDD.of(jsc.parallelize( + IntStream.rangeClosed(0, 100).boxed().collect(Collectors.toList()), numPartitions)); + assertEquals(100, rddData.getNumPartitions()); + + // repartition by 10. + rddData = rddData.repartition(10); + assertEquals(10, rddData.getNumPartitions()); + + // coalesce to 5 + rddData = rddData.coalesce(5); + assertEquals(5, rddData.getNumPartitions()); + + // repartition to 20 + rddData = rddData.repartition(20); + assertEquals(20, rddData.getNumPartitions()); + + // but colesce may not expand the num partitions + rddData = rddData.coalesce(40); + assertEquals(20, rddData.getNumPartitions()); + } + @Test void testMapPartitionsWithCloseable() { String partition1 = "partition1"; diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java index 70b0365dd67e..96c95708ab75 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java @@ -81,6 +81,15 @@ public final class HoodieMetadataConfig extends HoodieConfig { + "in streaming manner rather than two disjoint writes. By default " + "streaming writes to metadata table is enabled for SPARK engine for incremental operations and disabled for all other cases."); + public static final ConfigProperty<Integer> STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR = ConfigProperty + .key(METADATA_PREFIX + ".streaming.write.datatable.write.statuses.coalesce.divisor") + .defaultValue(5000) + .markAdvanced() + .sinceVersion("1.1.0") + .withDocumentation("When streaming writes to metadata table is enabled via hoodie.metadata.streaming.write.enabled, the data table write statuses are unioned " + + "with metadata table write statuses before triggering the entire write dag. The data table write statuses will be coalesce down to the number of write statuses " + + "divided by the specified divisor to avoid triggering thousands of no-op tasks for the data table writes which have their status cached."); + public static final boolean DEFAULT_METADATA_ENABLE_FOR_READERS = true; // Enable metrics for internal Metadata Table @@ -609,6 +618,10 @@ public final class HoodieMetadataConfig extends HoodieConfig { return getBoolean(STREAMING_WRITE_ENABLED); } + public int getStreamingWritesCoalesceDivisorForDataTableWrites() { + return getInt(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR); + } + public boolean isBloomFilterIndexEnabled() { return getBooleanOrDefault(ENABLE_METADATA_INDEX_BLOOM_FILTER); } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java index a0b2ce75e967..cd95ee23a7e6 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieData.java @@ -220,6 +220,15 @@ public interface HoodieData<T> extends Serializable { */ HoodieData<T> repartition(int parallelism); + /** + * Coalesces underlying collection (if applicable) making sure new {@link HoodieData} has + * exactly {@code parallelism} partitions or less. + * + * @param parallelism target number of partitions in the underlying collection + * @return {@link HoodieData<T>} holding coalesced collection + */ + HoodieData<T> coalesce(int parallelism); + default <O> HoodieData<T> distinctWithKey(SerializableFunction<T, O> keyGetter, int parallelism) { return mapToPair(i -> Pair.of(keyGetter.apply(i), i)) .reduceByKey((value1, value2) -> value1, parallelism) diff --git a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java index 7fd16af82382..6da031e1a5de 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/data/HoodieListData.java @@ -197,6 +197,12 @@ public class HoodieListData<T> extends HoodieBaseListData<T> implements HoodieDa return this; } + @Override + public HoodieData<T> coalesce(int parallelism) { + // no op + return this; + } + @Override public boolean isEmpty() { return super.isEmpty(); diff --git a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java index eb97a4c4a345..73d705df907d 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/config/TestHoodieMetadataConfig.java @@ -59,4 +59,27 @@ class TestHoodieMetadataConfig { .build(); assertEquals(-50, configWithNegativeValue.getRecordPreparationParallelism()); } + + @Test + void testStreamingWritesCoalesceDivisorForDataTableWrites() { + // Test default value + HoodieMetadataConfig config = HoodieMetadataConfig.newBuilder().build(); + assertEquals(5000, config.getStreamingWritesCoalesceDivisorForDataTableWrites()); + + // Test custom value + Properties props = new Properties(); + props.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(), "1"); + HoodieMetadataConfig configWithCustomValue = HoodieMetadataConfig.newBuilder() + .fromProperties(props) + .build(); + assertEquals(1, configWithCustomValue.getStreamingWritesCoalesceDivisorForDataTableWrites()); + + Properties propsZero = new Properties(); + propsZero.put(HoodieMetadataConfig.STREAMING_WRITE_DATATABLE_WRITE_STATUSES_COALESCE_DIVISOR.key(), "10000"); + HoodieMetadataConfig configWithZeroValue = HoodieMetadataConfig.newBuilder() + .fromProperties(propsZero) + .build(); + assertEquals(10000, configWithZeroValue.getStreamingWritesCoalesceDivisorForDataTableWrites()); + } + }