This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 38feb9be595 [HUDI-8985] Fix avg record size estimator to avoid parsing 
all commit metadata of active timeline (#12803)
38feb9be595 is described below

commit 38feb9be5950dbb2b049e5c2f26636dccfd5aeb9
Author: Davis-Zhang-Onehouse 
<[email protected]>
AuthorDate: Fri Feb 14 10:30:39 2025 -0800

    [HUDI-8985] Fix avg record size estimator to avoid parsing all commit 
metadata of active timeline (#12803)
---
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  51 ++++
 .../hudi/estimator/AverageRecordSizeEstimator.java | 134 ++++++++++
 .../apache/hudi/estimator/RecordSizeEstimator.java |  48 ++++
 .../hudi/estimator/RecordSizeEstimatorFactory.java |  45 ++++
 .../estimator/TestAverageRecordSizeEstimator.java  | 279 +++++++++++++++++++++
 .../estimator/TestRecordSizeEstimatorFactory.java  |  72 ++++++
 .../action/commit/AverageRecordSizeUtils.java      |  91 -------
 .../table/action/commit/UpsertPartitioner.java     |   8 +-
 .../action/commit/TestAverageRecordSizeUtils.java  | 219 ----------------
 .../table/action/commit/TestUpsertPartitioner.java |  92 -------
 .../versioning/DefaultInstantComparator.java       |  24 ++
 .../hudi/common/testutils/HoodieTestUtils.java     |   3 +
 12 files changed, 662 insertions(+), 404 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e57e9b29a83..e420a6e7ad0 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -67,6 +67,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
 import org.apache.hudi.config.metrics.HoodieMetricsM3Config;
+import org.apache.hudi.estimator.AverageRecordSizeEstimator;
 import org.apache.hudi.exception.HoodieNotSupportedException;
 import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
 import org.apache.hudi.index.HoodieIndex;
@@ -211,6 +212,29 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("Key generator class, that implements 
`org.apache.hudi.keygen.KeyGenerator` "
           + "extract a key out of incoming records.");
 
+  public static final ConfigProperty<String> RECORD_SIZE_ESTIMATOR_CLASS_NAME 
= ConfigProperty
+      .key("hoodie.record.size.estimator.class")
+      .defaultValue(AverageRecordSizeEstimator.class.getName())
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("Class that estimates the size of records written by 
implementing "
+          + "`org.apache.hudi.estimator.RecordSizeEstimator`. Default 
implementation is `org.apache.hudi.estimator.AverageRecordSizeEstimator`");
+
+  public static final ConfigProperty<Integer> 
RECORD_SIZE_ESTIMATOR_MAX_COMMITS = ConfigProperty
+      .key("_hoodie.record.size.estimator.max.commits")
+      .defaultValue(5)
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("The maximum number of commits that will be read to 
estimate the avg record size. "
+          + "This makes sure we parse a limited number of commit metadata, as 
parsing the entire active timeline can be expensive and unnecessary.");
+
+  public static final ConfigProperty<String> 
RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE = ConfigProperty
+      .key("hoodie.record.size.estimator.average.metadata.size")
+      .defaultValue("0")
+      .markAdvanced()
+      .sinceVersion("1.0.0")
+      .withDocumentation("The approximate metadata size in bytes to subtract 
from the file size when estimating the record size.");
+
   public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE = 
ConfigProperty
       .key("hoodie.write.executor.type")
       .defaultValue(ExecutorType.SIMPLE.name())
@@ -1649,6 +1673,18 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getInt(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
   }
 
+  public String getRecordSizeEstimator() {
+    return getStringOrDefault(RECORD_SIZE_ESTIMATOR_CLASS_NAME);
+  }
+
+  public int getRecordSizeEstimatorMaxCommits() {
+    return getIntOrDefault(RECORD_SIZE_ESTIMATOR_MAX_COMMITS);
+  }
+
+  public long getRecordSizeEstimatorAverageMetadataSize() {
+    return 
Long.parseLong(getStringOrDefault(RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE));
+  }
+
   public boolean allowMultipleCleans() {
     return getBoolean(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS);
   }
@@ -3016,6 +3052,21 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRecordSizeEstimator(String recordSizeEstimator) {
+      writeConfig.setValue(RECORD_SIZE_ESTIMATOR_CLASS_NAME, 
recordSizeEstimator);
+      return this;
+    }
+
+    public Builder withRecordSizeEstimatorMaxCommits(int maxCommits) {
+      writeConfig.setValue(RECORD_SIZE_ESTIMATOR_MAX_COMMITS, 
String.valueOf(maxCommits));
+      return this;
+    }
+
+    public Builder withRecordSizeEstimatorAverageMetadataSize(long 
avgMetadataSize) {
+      writeConfig.setValue(RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE, 
String.valueOf(avgMetadataSize));
+      return this;
+    }
+
     public Builder withExecutorType(String executorClass) {
       writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass);
       return this;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
new file mode 100644
index 00000000000..ec43ca76526
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+
+/**
+ * Default Implementation for the {@link RecordSizeEstimator}.
+ * Estimates the average record sizes based on the stats from the previous X 
commits and deltacommits.
+ * X is configured using hoodieWriteConfig.getRecordSizeEstimateMaxCommits().
+ * <p>
+ * Currently, we will estimate the avg record sizes only from candidate files 
from the commit metadata. Candidate
+ * files are selective files that have a threshold size to avoid measurement 
errors. Optionally, we can
+ * configure the expected metadata size of the file so that can be accounted 
for.
+ */
+public class AverageRecordSizeEstimator extends RecordSizeEstimator {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AverageRecordSizeEstimator.class);
+  /*
+   * NOTE: we only use commit instants to calculate average record size 
because replacecommit can be
+   * created by clustering, which has smaller average record size, which 
affects assigning inserts and
+   * may result in OOM by making spark underestimate the actual input record 
sizes.
+   */
+  private static final Set<String> RECORD_SIZE_ESTIMATE_ACTIONS = 
CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, 
COMPACTION_ACTION);
+
+  public AverageRecordSizeEstimator(HoodieWriteConfig writeConfig) {
+    super(writeConfig);
+  }
+
+  @Override
+  public long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe) {
+    int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
+    final AverageRecordSizeStats averageRecordSizeStats = new 
AverageRecordSizeStats(hoodieWriteConfig);
+    try {
+      if (!commitTimeline.empty()) {
+        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
+        Stream<HoodieInstant> filteredInstants = 
commitTimeline.filterCompletedInstants()
+            .getReverseOrderedInstants()
+            .filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
+            .limit(maxCommits);
+        filteredInstants
+            .forEach(instant -> {
+              HoodieCommitMetadata commitMetadata;
+              try {
+                commitMetadata = commitMetadataSerDe
+                    .deserialize(instant, 
commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+                if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+                  // let's consider only base files in case of delta commits
+                  commitMetadata.getWriteStats().stream().parallel()
+                      .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
+                      .forEach(hoodieWriteStat -> 
averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(), 
hoodieWriteStat.getNumWrites()));
+                } else {
+                  
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(), 
commitMetadata.fetchTotalRecordsWritten());
+                }
+              } catch (IOException ignore) {
+                LOG.info("Failed to parse commit metadata", ignore);
+              }
+            });
+      }
+    } catch (Throwable t) {
+      LOG.warn("Got error while trying to compute average bytes/record but 
will proceed to use the computed value "
+          + "or fallback to default config value ", t);
+    }
+    return averageRecordSizeStats.computeAverageRecordSize();
+  }
+
+  private static class AverageRecordSizeStats implements Serializable {
+    private final HoodieAtomicLongAccumulator totalBytesWritten;
+    private final HoodieAtomicLongAccumulator totalRecordsWritten;
+    private final long fileSizeThreshold;
+    private final long avgMetadataSize;
+    private final int defaultRecordSize;
+
+    public AverageRecordSizeStats(HoodieWriteConfig hoodieWriteConfig) {
+      totalBytesWritten = HoodieAtomicLongAccumulator.create();
+      totalRecordsWritten = HoodieAtomicLongAccumulator.create();
+      fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+      avgMetadataSize = 
hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
+      defaultRecordSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    }
+
+    private void updateStats(long fileSizeInBytes, long recordWritten) {
+      if (fileSizeInBytes > fileSizeThreshold && fileSizeInBytes > 
avgMetadataSize && recordWritten > 0) {
+        totalBytesWritten.add(fileSizeInBytes - avgMetadataSize);
+        totalRecordsWritten.add(recordWritten);
+      }
+    }
+
+    private long computeAverageRecordSize() {
+      if (totalBytesWritten.value() > 0 && totalRecordsWritten.value() > 0) {
+        return totalBytesWritten.value() / totalRecordsWritten.value();
+      }
+      // Fallback to default implementation in the cases were we either got an 
exception before we could
+      // compute the average record size or there are no eligible commits yet.
+      return defaultRecordSize;
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
new file mode 100644
index 00000000000..c9fc26a30cd
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.io.Serializable;
+
+/**
+ * Abstract class to compute the estimated record size used for assigning 
records to spark partitions
+ * for file writing.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class RecordSizeEstimator implements Serializable {
+
+  protected final HoodieWriteConfig hoodieWriteConfig;
+
+  protected RecordSizeEstimator(HoodieWriteConfig config) {
+    this.hoodieWriteConfig = config;
+  }
+
+  /**
+   * Generate a Hoodie Key out of provided generic record.
+   */
+  @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+  public abstract long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe);
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
new file mode 100644
index 00000000000..6fd99e8ca25
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory class to instantiate the configured implementation of {@link 
RecordSizeEstimator}.
+ * Since record estimation is best effort, we use the {@link 
AverageRecordSizeEstimator} (default implementation)
+ * as a backup in case of fatal exceptions.
+ */
+public class RecordSizeEstimatorFactory {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(RecordSizeEstimatorFactory.class);
+
+  public static RecordSizeEstimator 
createRecordSizeEstimator(HoodieWriteConfig writeConfig) {
+    String recordSizeEstimatorClass = writeConfig.getRecordSizeEstimator();
+    try {
+      return (RecordSizeEstimator) 
ReflectionUtils.loadClass(recordSizeEstimatorClass, writeConfig);
+    } catch (Throwable e) {
+      LOG.warn("Unable to instantiate the record estimator implementation {}. 
Falling back to use default AverageRecordSizeEstimator.\" ", 
recordSizeEstimatorClass, e);
+    }
+    return new AverageRecordSizeEstimator(writeConfig);
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
new file mode 100644
index 00000000000..5b7610ecdd3
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_COMPARATOR;
+import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static 
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test average record size estimation.
+ */
+public class TestAverageRecordSizeEstimator {
+  private static final String BASE_FILE_EXTENSION =
+      HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+  private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
+  private final CommitMetadataSerDe mockCommitMetadataSerDe = 
mock(CommitMetadataSerDe.class);
+  private static final String PARTITION1 = "partition1";
+  private static final String TEST_WRITE_TOKEN = "1-0-1";
+  private static final Integer DEFAULT_MAX_COMMITS = 2;
+  private static final Integer DEFAULT_MAX_PARQUET_METADATA_SIZE = 1000;
+  private static final Double DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD = 0.1;
+
+  @Test
+  public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws 
Exception {
+    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+    HoodieWriteConfig config = 
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+    when(commitTimeLine.empty()).thenReturn(true);
+    long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
+    AverageRecordSizeEstimator averageRecordSizeEstimator = new 
AverageRecordSizeEstimator(config);
+    long actualAvgSize = 
averageRecordSizeEstimator.averageBytesPerRecord(commitTimeLine, 
COMMIT_METADATA_SER_DE);
+    assertEquals(expectAvgSize, actualAvgSize);
+  }
+
+  @Test
+  public void testErrorHandling() {
+    int recordSize = 10000;
+    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+        
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(recordSize)))
+        .build(false);
+    BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
+    List<HoodieInstant> instants = Collections.singletonList(
+        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "1"));
+
+    when(mockTimeline.getInstants()).thenReturn(instants);
+    when(mockTimeline.getReverseOrderedInstants()).then(i -> 
instants.stream());
+    // Simulate a case where the instant details are absent
+    commitsTimeline.setInstants(new ArrayList<>());
+    AverageRecordSizeEstimator averageRecordSizeEstimator = new 
AverageRecordSizeEstimator(writeConfig);
+    long actualAvgSize = 
averageRecordSizeEstimator.averageBytesPerRecord(mockTimeline, 
COMMIT_METADATA_SER_DE);
+    assertEquals(recordSize, actualAvgSize);
+  }
+
+  @ParameterizedTest
+  @MethodSource("testCases")
+  public void 
testAverageRecordSizeWithNonEmptyCommitTimeline(List<Pair<HoodieInstant, 
List<HWriteStat>>> instantSizePairs, long expectedSize) {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
+        .withRecordSizeEstimator(AverageRecordSizeEstimator.class.getName())
+        .withRecordSizeEstimatorMaxCommits(DEFAULT_MAX_COMMITS)
+        
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_MAX_PARQUET_METADATA_SIZE)
+        .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+            
.compactionRecordSizeEstimateThreshold(DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD)
+            .build())
+        .build();
+
+    List<HoodieInstant> instants = new ArrayList<>();
+    instantSizePairs.forEach(entry -> {
+      HoodieInstant hoodieInstant = entry.getKey();
+      HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+      entry.getValue().forEach(hWriteStat -> {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
+        writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * 
hWriteStat.getTotalRecordsWritten());
+        writeStat.setPath(hWriteStat.getPath());
+        commitMetadata.addWriteStat(PARTITION1, writeStat);
+      });
+      instants.add(hoodieInstant);
+      try {
+        when(mockTimeline.getInstantDetails(hoodieInstant))
+            
.thenReturn(org.apache.hudi.common.util.Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+        when(mockCommitMetadataSerDe.deserialize(hoodieInstant, 
mockTimeline.getInstantDetails(hoodieInstant).get(), 
HoodieCommitMetadata.class))
+            .thenReturn(commitMetadata);
+      } catch (IOException e) {
+        throw new RuntimeException("Should not have failed", e);
+      }
+    });
+
+    List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
+    Collections.reverse(reverseOrderInstants);
+
+    when(mockTimeline.filterCompletedInstants()).thenReturn(mockTimeline);
+    when(mockTimeline.getReverseOrderedInstants()).then(i -> 
reverseOrderInstants.stream());
+
+    AverageRecordSizeEstimator averageRecordSizeEstimator = new 
AverageRecordSizeEstimator(writeConfig);
+    long actualSize = 
averageRecordSizeEstimator.averageBytesPerRecord(mockTimeline, 
mockCommitMetadataSerDe);
+    assertEquals(expectedSize, actualSize);
+  }
+
+  private static String getBaseFileName(String instantTime) {
+    String fileName = UUID.randomUUID().toString();
+    return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName, 
BASE_FILE_EXTENSION);
+  }
+
+  private static String getLogFileName(String instantTime) {
+    String fileName = UUID.randomUUID().toString();
+    String fullFileName = FSUtils.makeBaseFileName(instantTime, 
TEST_WRITE_TOKEN, fileName, BASE_FILE_EXTENSION);
+    assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
+    return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), 
instantTime, 1, TEST_WRITE_TOKEN);
+  }
+
+  private static Stream<Arguments> testCases() {
+    Long baseInstant = 20231204194919610L;
+    List<Arguments> arguments = new ArrayList<>();
+    // Note the avg record estimate is based on a parquet metadata size of 
500Bytes per file.
+    // 1. straight forward. just 1 instant.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+            Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 100L)))), 99L));
+
+    // 2. two instants. avg of both the instants
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 100L))),
+            Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 1000000L, 200L)))), 109L));
+
+    // 3. two instants, latest commit has a small file thats just above 
threshold, while earliest commit is fully ignored,
+    // since it below the threshold size limit
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
9000L, 1000L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 110000, 100L)))), 99L));
+
+    // 4. 2nd instance is replace commit, it shld be excluded and should be 
avg of both commits.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 100L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 10000000L, 200L)))), 99L));
+
+    // 5. for delta commits, only parquet files should be accounted for.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 100L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 10000000L, 200L)))), 149L));
+
+    // 6. delta commit has a mix of parquet and log files. only parquet files 
should be accounted for.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
 Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
1000000L, 100L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Arrays.asList(generateBaseWriteStat(baseInstant + 100, 
10000000L, 200L),
+                    generateLogWriteStat(baseInstant + 100, 10000000L, 
300L)))), 190L));
+
+    // 7. 2nd delta commit only has log files. and so we honor 1st delta 
commit size.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
 Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 100L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Arrays.asList(generateLogWriteStat(baseInstant + 100, 
1000000L, 200L),
+                    generateLogWriteStat(baseInstant + 100, 10000000L, 
300L)))), 99L));
+
+    // 8. since default max commits is overriden to 2 commits, ignore the 
earliest commit here since there are total 3 commits
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 1000L))),
+            Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 10000000L, 50L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 200)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
200, 10000000L, 100L)))), 74L));
+
+    // 9. replace commits should be ignored despite being the latest commits.
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
 Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
1000000L, 100L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Arrays.asList(generateLogWriteStat(baseInstant + 100, 
1000000L, 200L),
+                    generateLogWriteStat(baseInstant + 100, 1000000L, 300L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
200, 1000000L, 2000L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
300, 1000000L, 3000L)))), 99L));
+
+    // 10. Ignore commit stat with 0 records
+    arguments.add(Arguments.of(
+        
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant, 
10000000L, 1000L))),
+            Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION, 
Long.toString(baseInstant + 100)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
100, 10000000L, 50L))),
+            
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION, 
Long.toString(baseInstant + 200)),
+                Collections.singletonList(generateBaseWriteStat(baseInstant + 
200, 0L, 1000L)))), 49L));
+
+    return arguments.stream();
+  }
+
+  private static HoodieInstant generateCompletedInstant(String action, String 
instant) {
+    return new HoodieInstant(HoodieInstant.State.COMPLETED, action, instant, 
INSTANT_COMPARATOR.requestedTimeOrderedComparator());
+  }
+
+  private static HWriteStat generateBaseWriteStat(long instant, long 
totalRecordsWritten, long perRecordSize) {
+    return new HWriteStat(getBaseFileName(String.valueOf(instant)), 
totalRecordsWritten, perRecordSize);
+  }
+
+  private static HWriteStat generateLogWriteStat(long instant, long 
totalRecordsWritten, long perRecordSize) {
+    return new HWriteStat(getLogFileName(String.valueOf(instant)), 
totalRecordsWritten, perRecordSize);
+  }
+
+  static class HWriteStat {
+    private final String path;
+    private final Long totalRecordsWritten;
+    private final Long perRecordSize;
+
+    public HWriteStat(String path, Long totalRecordsWritten, Long 
perRecordSize) {
+      this.path = path;
+      this.totalRecordsWritten = totalRecordsWritten;
+      this.perRecordSize = perRecordSize;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public Long getTotalRecordsWritten() {
+      return totalRecordsWritten;
+    }
+
+    public Long getPerRecordSize() {
+      return perRecordSize;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
new file mode 100644
index 00000000000..283c9f306ce
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.parquet.Strings;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRecordSizeEstimatorFactory {
+
+  @ParameterizedTest
+  @MethodSource("differentEstimatorImplementations")
+  public void testRecordSizeEstimatorFactoryWithCustomConfig(String className, 
Class<?> clazz) {
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+        .withPath("/tmp");
+    if (!Strings.isNullOrEmpty(className)) {
+      builder.withRecordSizeEstimator(className);
+    }
+
+    RecordSizeEstimator estimator = 
RecordSizeEstimatorFactory.createRecordSizeEstimator(builder.build());
+    assertTrue(clazz.isInstance(estimator));
+  }
+
+  static Stream<Arguments> differentEstimatorImplementations() {
+    List<Arguments> arguments = new ArrayList<>();
+    // Explicitly configured
+    arguments.add(Arguments.of(TestRecordSizeEstimator.class.getName(), 
TestRecordSizeEstimator.class));
+    // Not configured
+    arguments.add(Arguments.of(null, AverageRecordSizeEstimator.class));
+    // Incorrectly configured
+    
arguments.add(Arguments.of("org.apache.hudi.estimator.IncorrectRecordEstimator",
 AverageRecordSizeEstimator.class));
+    return arguments.stream();
+  }
+
+  public static class TestRecordSizeEstimator extends RecordSizeEstimator {
+    public TestRecordSizeEstimator(HoodieWriteConfig config) {
+      super(config);
+    }
+
+    @Override
+    public long averageBytesPerRecord(HoodieTimeline commitTimeline, 
CommitMetadataSerDe commitMetadataSerDe) {
+      return 0;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
deleted file mode 100644
index 8873fb01c97..00000000000
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StoragePath;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
-import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
-
-/**
- * Util class to assist with fetching average record size.
- */
-public class AverageRecordSizeUtils {
-  private static final Logger LOG = 
LoggerFactory.getLogger(AverageRecordSizeUtils.class);
-
-  /**
-   * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
-   * records pack into one file.
-   */
-  static long averageBytesPerRecord(HoodieTimeline commitTimeline, 
HoodieWriteConfig hoodieWriteConfig, CommitMetadataSerDe commitMetadataSerDe) {
-    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
-    long fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
-    if (!commitTimeline.empty()) {
-      // Go over the reverse ordered commits to get a more recent estimate of 
average record size.
-      Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
-      while (instants.hasNext()) {
-        HoodieInstant instant = instants.next();
-        try {
-          HoodieCommitMetadata commitMetadata = commitMetadataSerDe
-              .deserialize(instant, 
commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
-          if (instant.getAction().equals(COMMIT_ACTION) || 
instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
-            long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
-            long totalRecordsWritten = 
commitMetadata.fetchTotalRecordsWritten();
-            if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 
0) {
-              avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
-              break;
-            }
-          } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
-            // lets consider only base files in case of delta commits
-            AtomicLong totalBytesWritten = new AtomicLong(0L);
-            AtomicLong totalRecordsWritten = new AtomicLong(0L);
-            commitMetadata.getWriteStats().stream()
-                .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
-                .forEach(hoodieWriteStat -> {
-                  
totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes());
-                  
totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites());
-                });
-            if (totalBytesWritten.get() > fileSizeThreshold && 
totalRecordsWritten.get() > 0) {
-              avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) / 
totalRecordsWritten.get());
-              break;
-            }
-          }
-        } catch (Throwable t) {
-          // make this fail safe.
-          LOG.error("Error trying to compute average bytes/record ", t);
-        }
-      }
-    }
-    return avgSize;
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 146a01d0886..4e2c6bbb49b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.NumericUtils;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.estimator.RecordSizeEstimator;
+import org.apache.hudi.estimator.RecordSizeEstimatorFactory;
 import org.apache.hudi.table.HoodieTable;
 import org.apache.hudi.table.WorkloadProfile;
 import org.apache.hudi.table.WorkloadStat;
@@ -85,6 +87,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
 
   protected final HoodieWriteConfig config;
   private final WriteOperationType operationType;
+  private final RecordSizeEstimator recordSizeEstimator;
 
   public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext 
context, HoodieTable table,
                            HoodieWriteConfig config, WriteOperationType 
operationType) {
@@ -94,6 +97,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
     bucketInfoMap = new HashMap<>();
     this.config = config;
     this.operationType = operationType;
+    this.recordSizeEstimator = 
RecordSizeEstimatorFactory.createRecordSizeEstimator(config);
     assignUpdates(profile);
     long totalInserts = 
profile.getInputPartitionPathStatMap().values().stream().mapToLong(stat -> 
stat.getNumInserts()).sum();
     if (!WriteOperationType.isPreppedWriteOperation(operationType) || 
totalInserts > 0) { // skip if its prepped write operation. or if totalInserts 
= 0.
@@ -174,9 +178,9 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
      * may result in OOM by making spark underestimate the actual input record 
sizes.
      */
     TimelineLayout layout = 
TimelineLayout.fromVersion(table.getActiveTimeline().getTimelineLayoutVersion());
-    long averageRecordSize = 
AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
+    long averageRecordSize = 
recordSizeEstimator.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
         .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
-        .filterCompletedInstants(), config,  layout.getCommitMetadataSerDe());
+        .filterCompletedInstants(),  layout.getCommitMetadataSerDe());
     LOG.info("AvgRecordSize => " + averageRecordSize);
 
     Map<String, List<SmallFile>> partitionSmallFilesMap =
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
deleted file mode 100644
index 971518a685f..00000000000
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
-import static 
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Test average record size estimation.
- */
-public class TestAverageRecordSizeUtils {
-
-  private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
-  private static final String PARTITION1 = "partition1";
-  private static final String TEST_WRITE_TOKEN = "1-0-1";
-
-  @ParameterizedTest
-  @MethodSource("testCases")
-  public void testAverageRecordSize(List<Pair<HoodieInstant, 
List<HWriteStat>>> instantSizePairs, long expectedSize) throws IOException {
-    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
-        .build();
-    HoodieTableMetaClient metaClient = HoodieTestUtils.init("/tmp");
-    HoodieTimeline commitsTimeline = new BaseTimelineV2();
-    List<HoodieInstant> instants = new ArrayList<>();
-    instantSizePairs.forEach(entry -> {
-      HoodieInstant hoodieInstant = entry.getKey();
-      HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-      entry.getValue().forEach(hWriteStat -> {
-        HoodieWriteStat writeStat = new HoodieWriteStat();
-        writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
-        writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * 
hWriteStat.getTotalRecordsWritten());
-        writeStat.setPath(hWriteStat.getPath());
-        commitMetadata.addWriteStat(PARTITION1, writeStat);
-      });
-      instants.add(hoodieInstant);
-      try {
-        
when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(COMMIT_METADATA_SER_DE,
 commitMetadata));
-      } catch (IOException e) {
-        throw new RuntimeException("Should not have failed", e);
-      }
-    });
-
-    List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
-    Collections.reverse(reverseOrderInstants);
-    when(mockTimeline.getInstants()).thenReturn(instants);
-    when(mockTimeline.getReverseOrderedInstants()).then(i -> 
reverseOrderInstants.stream());
-    commitsTimeline.setInstants(instants);
-
-    assertEquals(expectedSize, 
AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig, 
COMMIT_METADATA_SER_DE));
-  }
-
-  @Test
-  public void testErrorHandling() throws IOException {
-    int recordSize = 10000;
-    HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
-        
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(), 
String.valueOf(recordSize)))
-        .build(false);
-    BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
-    List<HoodieInstant> instants = Collections.singletonList(
-        INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "1"));
-
-    when(mockTimeline.getInstants()).thenReturn(instants);
-    when(mockTimeline.getReverseOrderedInstants()).then(i -> 
instants.stream());
-    // Simulate a case where the instant details are absent
-    commitsTimeline.setInstants(new ArrayList<>());
-
-    assertEquals(recordSize, 
AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig, 
COMMIT_METADATA_SER_DE));
-  }
-
-  private static String getBaseFileName(String instantTime) {
-    String fileName = UUID.randomUUID().toString();
-    return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName, 
PARQUET.getFileExtension());
-  }
-
-  private static String getLogFileName(String instantTime) {
-    String fileName = UUID.randomUUID().toString();
-    String fullFileName = FSUtils.makeBaseFileName(instantTime, 
TEST_WRITE_TOKEN, fileName, PARQUET.getFileExtension());
-    assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
-    return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), 
instantTime, 1, TEST_WRITE_TOKEN);
-  }
-
-  static Stream<Arguments> testCases() {
-    Long baseInstant = 20231204194919610L;
-    List<Arguments> arguments = new ArrayList<>();
-    // COW
-    // straight forward. just 1 instant.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 
100L));
-
-    // two instants. latest instant should be honored
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
-
-    // two instants, while 2nd one is smaller in size so as to not meet the 
threshold. So, 1st one should be honored
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000L, 
200L)))), 100L));
-
-    // 2nd instance is replace commit and should be honored.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
-
-    // MOR
-    // for delta commits, only parquet files should be accounted for.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
-
-    // delta commit has a mix of parquet and log files. only parquet files 
should be accounted for.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Arrays.asList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L),
-                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 10000000L, 300L)))), 200L));
-
-    // 2nd delta commit only has log files. and so we honor 1st delta commit 
size.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Arrays.asList(new 
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
-                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 10000000L, 300L)))), 100L));
-
-    // replace commit should be honored.
-    arguments.add(Arguments.of(
-        
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
 HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
-                Arrays.asList(new 
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
-                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 1000000L, 300L))),
-            
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
-                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 200)), 1000000L, 
400L)))), 400L));
-    return arguments.stream();
-  }
-
-  static class HWriteStat {
-    private final String path;
-    private final Long totalRecordsWritten;
-    private final Long perRecordSize;
-
-    public HWriteStat(String path, Long totalRecordsWritten, Long 
perRecordSize) {
-      this.path = path;
-      this.totalRecordsWritten = totalRecordsWritten;
-      this.perRecordSize = perRecordSize;
-    }
-
-    public String getPath() {
-      return path;
-    }
-
-    public Long getTotalRecordsWritten() {
-      return totalRecordsWritten;
-    }
-
-    public Long getPerRecordSize() {
-      return perRecordSize;
-    }
-  }
-}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 8995917621e..6c28fb66dcf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -22,14 +22,10 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
 import org.apache.hudi.avro.model.HoodieCompactionPlan;
 import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
 import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.CompactionTestUtils;
 import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
 import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -52,28 +48,18 @@ import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
 import scala.Tuple2;
 
-import static 
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
-import static 
org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
 
 public class TestUpsertPartitioner extends HoodieClientTestBase {
 
@@ -112,84 +98,6 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     return partitioner;
   }
 
-  private static List<HoodieInstant> setupHoodieInstants() {
-    List<HoodieInstant> instants = new ArrayList<>();
-    
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "ts1"));
-    
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "ts2"));
-    
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "ts3"));
-    
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "ts4"));
-    
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, "ts5"));
-    Collections.reverse(instants);
-    return instants;
-  }
-
-  private static List<HoodieWriteStat> generateCommitStatWith(int 
totalRecordsWritten, int totalBytesWritten) {
-    List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
-    // clear all record and byte stats except for last entry.
-    for (int i = 0; i < writeStatsList.size() - 1; i++) {
-      HoodieWriteStat writeStat = writeStatsList.get(i);
-      writeStat.setNumWrites(0);
-      writeStat.setTotalWriteBytes(0);
-    }
-    HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() - 
1);
-    lastWriteStat.setTotalWriteBytes(totalBytesWritten);
-    lastWriteStat.setNumWrites(totalRecordsWritten);
-    return writeStatsList;
-  }
-
-  private static HoodieCommitMetadata generateCommitMetadataWith(int 
totalRecordsWritten, int totalBytesWritten) {
-    List<HoodieWriteStat> fakeHoodieWriteStats = 
generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
-    HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
-    fakeHoodieWriteStats.forEach(stat -> 
commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
-    return commitMetadata;
-  }
-
-  /*
-   * This needs to be a stack so we test all cases when either/both 
recordsWritten ,bytesWritten is zero before a non
-   * zero averageRecordSize can be computed.
-   */
-  private static LinkedList<Option<byte[]>> generateCommitMetadataList() 
throws IOException {
-    LinkedList<Option<byte[]>> commits = new LinkedList<>();
-    // First commit with non zero records and bytes
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(2000, 10000)));
-    // Second commit with non zero records and bytes
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(1500, 7500)));
-    // Third commit with a small file
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(100, 500)));
-    // Fourth commit with both zero records and zero bytes
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(0, 0)));
-    // Fifth commit with zero records
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(0, 1500)));
-    // Sixth commit with zero bytes
-    commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE, 
generateCommitMetadataWith(2500, 0)));
-    return commits;
-  }
-
-  @Test
-  public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws 
Exception {
-    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
-    HoodieWriteConfig config = makeHoodieClientConfigBuilder()
-        
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000).build())
-        .build();
-    when(commitTimeLine.empty()).thenReturn(false);
-    
when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
-    LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
-    
when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock
 -> commits.pop());
-    long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
-    long actualAvgSize = 
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config, 
COMMIT_METADATA_SER_DE);
-    assertEquals(expectAvgSize, actualAvgSize);
-  }
-
-  @Test
-  public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws 
Exception {
-    HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
-    HoodieWriteConfig config = makeHoodieClientConfigBuilder().build();
-    when(commitTimeLine.empty()).thenReturn(true);
-    long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
-    long actualAvgSize = 
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config, 
COMMIT_METADATA_SER_DE);
-    assertEquals(expectAvgSize, actualAvgSize);
-  }
-
   @Test
   public void testUpsertPartitioner() throws Exception {
     final String testPartitionPath = "2016/09/26";
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
new file mode 100644
index 00000000000..de2937008d6
--- /dev/null
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.versioning;
+
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+
+public class DefaultInstantComparator extends InstantComparatorV2 {
+}
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 7f3e6f0ef7e..91a90dd5ed9 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -30,11 +30,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.HoodieTableVersion;
 import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.InstantComparator;
 import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
 import org.apache.hudi.common.table.timeline.InstantFileNameParser;
 import org.apache.hudi.common.table.timeline.InstantGenerator;
 import org.apache.hudi.common.table.timeline.TimelineFactory;
 import 
org.apache.hudi.common.table.timeline.versioning.DefaultCommitMetadataSerDe;
+import 
org.apache.hudi.common.table.timeline.versioning.DefaultInstantComparator;
 import 
org.apache.hudi.common.table.timeline.versioning.DefaultInstantFileNameGenerator;
 import 
org.apache.hudi.common.table.timeline.versioning.DefaultInstantFileNameParser;
 import 
org.apache.hudi.common.table.timeline.versioning.DefaultInstantGenerator;
@@ -92,6 +94,7 @@ public class HoodieTestUtils {
   public static final InstantFileNameGenerator INSTANT_FILE_NAME_GENERATOR = 
new DefaultInstantFileNameGenerator();
   public static final InstantFileNameParser INSTANT_FILE_NAME_PARSER = new 
DefaultInstantFileNameParser();
   public static final CommitMetadataSerDe COMMIT_METADATA_SER_DE = new 
DefaultCommitMetadataSerDe();
+  public static final InstantComparator INSTANT_COMPARATOR = new 
DefaultInstantComparator();
 
   public static StorageConfiguration<Configuration> getDefaultStorageConf() {
     return (StorageConfiguration<Configuration>) 
ReflectionUtils.loadClass(HADOOP_STORAGE_CONF,

Reply via email to