This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 2fbb1fc95ec4b9555778cfad24ddc83d94f38b8b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Wed May 15 07:29:51 2024 -0700

    [HUDI-7429] Fixing average record size estimation for delta commits (#10763)
    
    Co-authored-by: Y Ethan Guo <[email protected]>
    Co-authored-by: Jonathan Vexler <=>
---
 .../action/commit/AverageRecordSizeUtils.java      |  91 ++++++++++
 .../table/action/commit/UpsertPartitioner.java     |  41 +----
 .../action/commit/TestAverageRecordSizeUtils.java  | 195 +++++++++++++++++++++
 .../table/action/commit/TestUpsertPartitioner.java |   5 +-
 4 files changed, 294 insertions(+), 38 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
new file mode 100644
index 00000000000..9d9408e173b
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+
+/**
+ * Util class to assist with fetching average record size.
+ */
+public class AverageRecordSizeUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(AverageRecordSizeUtils.class);
+
+  /**
+   * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
+   * records pack into one file.
+   */
+  static long averageBytesPerRecord(HoodieTimeline commitTimeline, 
HoodieWriteConfig hoodieWriteConfig) {
+    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+    long fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
+    if (!commitTimeline.empty()) {
+      // Go over the reverse ordered commits to get a more recent estimate of 
average record size.
+      Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
+      while (instants.hasNext()) {
+        HoodieInstant instant = instants.next();
+        try {
+          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
+              .fromBytes(commitTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
+          if (instant.getAction().equals(COMMIT_ACTION) || 
instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
+            long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
+            long totalRecordsWritten = 
commitMetadata.fetchTotalRecordsWritten();
+            if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 
0) {
+              avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
+              break;
+            }
+          } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+            // lets consider only base files in case of delta commits
+            AtomicLong totalBytesWritten = new AtomicLong(0L);
+            AtomicLong totalRecordsWritten = new AtomicLong(0L);
+            commitMetadata.getWriteStats().stream()
+                .filter(hoodieWriteStat -> FSUtils.isBaseFile(new 
StoragePath(hoodieWriteStat.getPath())))
+                .forEach(hoodieWriteStat -> {
+                  
totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes());
+                  
totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites());
+                });
+            if (totalBytesWritten.get() > fileSizeThreshold && 
totalRecordsWritten.get() > 0) {
+              avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) / 
totalRecordsWritten.get());
+              break;
+            }
+          }
+        } catch (IOException ioe) {
+          // make this fail safe.
+          LOG.error("Error trying to compute average bytes/record ", ioe);
+        }
+      }
+    }
+    return avgSize;
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 2b78df96765..09904cd290e 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -22,7 +22,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.fs.FSUtils;
 import org.apache.hudi.common.model.HoodieBaseFile;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieWriteStat;
@@ -46,7 +45,6 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -56,6 +54,8 @@ import java.util.stream.Collectors;
 import scala.Tuple2;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
 
 /**
  * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD 
partition).
@@ -170,8 +170,9 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
      * created by clustering, which has smaller average record size, which 
affects assigning inserts and
      * may result in OOM by making spark underestimate the actual input record 
sizes.
      */
-    long averageRecordSize = 
averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
-        
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(),
 config);
+    long averageRecordSize = 
AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
+        .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
+        .filterCompletedInstants(), config);
     LOG.info("AvgRecordSize => " + averageRecordSize);
 
     Map<String, List<SmallFile>> partitionSmallFilesMap =
@@ -228,7 +229,7 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
         if (totalUnassignedInserts > 0) {
           long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize();
           if (config.shouldAutoTuneInsertSplits()) {
-            insertRecordsPerBucket = config.getParquetMaxFileSize() / 
averageRecordSize;
+            insertRecordsPerBucket = (int) Math.ceil((1.0 * 
config.getParquetMaxFileSize()) / averageRecordSize);
           }
 
           int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / 
insertRecordsPerBucket);
@@ -366,34 +367,4 @@ public class UpsertPartitioner<T> extends 
SparkHoodiePartitioner<T> {
       return targetBuckets.get(0).getKey().bucketNumber;
     }
   }
-
-  /**
-   * Obtains the average record size based on records written during previous 
commits. Used for estimating how many
-   * records pack into one file.
-   */
-  protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, 
HoodieWriteConfig hoodieWriteConfig) {
-    long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
-    long fileSizeThreshold = (long) 
(hoodieWriteConfig.getRecordSizeEstimationThreshold() * 
hoodieWriteConfig.getParquetSmallFileLimit());
-    try {
-      if (!commitTimeline.empty()) {
-        // Go over the reverse ordered commits to get a more recent estimate 
of average record size.
-        Iterator<HoodieInstant> instants = 
commitTimeline.getReverseOrderedInstants().iterator();
-        while (instants.hasNext()) {
-          HoodieInstant instant = instants.next();
-          HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-              .fromBytes(commitTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class);
-          long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
-          long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten();
-          if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 
0) {
-            avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / 
totalRecordsWritten);
-            break;
-          }
-        }
-      }
-    } catch (Throwable t) {
-      // make this fail safe.
-      LOG.error("Error trying to compute average bytes/record ", t);
-    }
-    return avgSize;
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
new file mode 100644
index 00000000000..5db8c978b65
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.table.action.commit;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
+import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
+import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test average record size estimation.
+ */
+public class TestAverageRecordSizeUtils {
+
+  private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
+  private static final String PARTITION1 = "partition1";
+  private static final String TEST_WRITE_TOKEN = "1-0-1";
+
+  @ParameterizedTest
+  @MethodSource("testCases")
+  public void testAverageRecordSize(List<Pair<HoodieInstant, 
List<HWriteStat>>> instantSizePairs, long expectedSize) {
+    HoodieWriteConfig writeConfig = 
HoodieWriteConfig.newBuilder().withPath("/tmp")
+        .build();
+    HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline();
+    List<HoodieInstant> instants = new ArrayList<>();
+    instantSizePairs.forEach(entry -> {
+      HoodieInstant hoodieInstant = entry.getKey();
+      HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+      entry.getValue().forEach(hWriteStat -> {
+        HoodieWriteStat writeStat = new HoodieWriteStat();
+        writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
+        writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * 
hWriteStat.getTotalRecordsWritten());
+        writeStat.setPath(hWriteStat.getPath());
+        commitMetadata.addWriteStat(PARTITION1, writeStat);
+      });
+      instants.add(hoodieInstant);
+      try {
+        
when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(Option.of(getUTF8Bytes(commitMetadata.toJsonString())));
+      } catch (IOException e) {
+        throw new RuntimeException("Should not have failed", e);
+      }
+    });
+
+    List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
+    Collections.reverse(reverseOrderInstants);
+    when(mockTimeline.getInstants()).thenReturn(instants);
+    when(mockTimeline.getReverseOrderedInstants()).then(i -> 
reverseOrderInstants.stream());
+    commitsTimeline.setInstants(instants);
+
+    assertEquals(expectedSize, 
AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig));
+  }
+
+  private static String getBaseFileName(String instantTime) {
+    String fileName = UUID.randomUUID().toString();
+    return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName, 
PARQUET.getFileExtension());
+  }
+
+  private static String getLogFileName(String instantTime) {
+    String fileName = UUID.randomUUID().toString();
+    String fullFileName = FSUtils.makeBaseFileName(instantTime, 
TEST_WRITE_TOKEN, fileName, PARQUET.getFileExtension());
+    assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
+    return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), 
instantTime, 1, TEST_WRITE_TOKEN);
+  }
+
+  static Stream<Arguments> testCases() {
+    Long baseInstant = 20231204194919610L;
+    List<Arguments> arguments = new ArrayList<>();
+    // COW
+    // straight forward. just 1 instant.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 
100L));
+
+    // two instants. latest instant should be honored
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
+
+    // two instants, while 2nd one is smaller in size so as to not meet the 
threshold. So, 1st one should be honored
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000L, 
200L)))), 100L));
+
+    // 2nd instance is replace commit and should be honored.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
+
+    // MOR
+    // for delta commits, only parquet files should be accounted for.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 
200L)))), 200L));
+
+    // delta commit has a mix of parquet and log files. only parquet files 
should be accounted for.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Arrays.asList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L),
+                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 10000000L, 300L)))), 200L));
+
+    // 2nd delta commit only has log files. and so we honor 1st delta commit 
size.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Arrays.asList(new 
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
+                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 10000000L, 300L)))), 100L));
+
+    // replace commit should be honored.
+    arguments.add(Arguments.of(
+        Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
+                Arrays.asList(new 
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
+                    new HWriteStat(getLogFileName(String.valueOf(baseInstant + 
100)), 1000000L, 300L))),
+            Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, 
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
+                Collections.singletonList(new 
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 200)), 1000000L, 
400L)))), 400L));
+    return arguments.stream();
+  }
+
+  static class HWriteStat {
+    private final String path;
+    private final Long totalRecordsWritten;
+    private final Long perRecordSize;
+
+    public HWriteStat(String path, Long totalRecordsWritten, Long 
perRecordSize) {
+      this.path = path;
+      this.totalRecordsWritten = totalRecordsWritten;
+      this.perRecordSize = perRecordSize;
+    }
+
+    public String getPath() {
+      return path;
+    }
+
+    public Long getTotalRecordsWritten() {
+      return totalRecordsWritten;
+    }
+
+    public Long getPerRecordSize() {
+      return perRecordSize;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 1ca12aad5b7..12ebd7cee01 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -67,7 +67,6 @@ import static 
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION
 import static 
org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
 import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
 import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes;
-import static 
org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
@@ -175,7 +174,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
     
when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock
 -> commits.pop());
     long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
-    long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+    long actualAvgSize = 
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config);
     assertEquals(expectAvgSize, actualAvgSize);
   }
 
@@ -185,7 +184,7 @@ public class TestUpsertPartitioner extends 
HoodieClientTestBase {
     HoodieWriteConfig config = makeHoodieClientConfigBuilder().build();
     when(commitTimeLine.empty()).thenReturn(true);
     long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
-    long actualAvgSize = averageBytesPerRecord(commitTimeLine, config);
+    long actualAvgSize = 
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config);
     assertEquals(expectAvgSize, actualAvgSize);
   }
 

Reply via email to