This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 2fbb1fc95ec4b9555778cfad24ddc83d94f38b8b Author: Sivabalan Narayanan <[email protected]> AuthorDate: Wed May 15 07:29:51 2024 -0700 [HUDI-7429] Fixing average record size estimation for delta commits (#10763) Co-authored-by: Y Ethan Guo <[email protected]> Co-authored-by: Jonathan Vexler <=> --- .../action/commit/AverageRecordSizeUtils.java | 91 ++++++++++ .../table/action/commit/UpsertPartitioner.java | 41 +---- .../action/commit/TestAverageRecordSizeUtils.java | 195 +++++++++++++++++++++ .../table/action/commit/TestUpsertPartitioner.java | 5 +- 4 files changed, 294 insertions(+), 38 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java new file mode 100644 index 00000000000..9d9408e173b --- /dev/null +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.storage.StoragePath; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; + +import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; + +/** + * Util class to assist with fetching average record size. + */ +public class AverageRecordSizeUtils { + private static final Logger LOG = LoggerFactory.getLogger(AverageRecordSizeUtils.class); + + /** + * Obtains the average record size based on records written during previous commits. Used for estimating how many + * records pack into one file. + */ + static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { + long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); + long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); + if (!commitTimeline.empty()) { + // Go over the reverse ordered commits to get a more recent estimate of average record size. + Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator(); + while (instants.hasNext()) { + HoodieInstant instant = instants.next(); + try { + HoodieCommitMetadata commitMetadata = HoodieCommitMetadata + .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); + if (instant.getAction().equals(COMMIT_ACTION) || instant.getAction().equals(REPLACE_COMMIT_ACTION)) { + long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); + long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); + if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); + break; + } + } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) { + // lets consider only base files in case of delta commits + AtomicLong totalBytesWritten = new AtomicLong(0L); + AtomicLong totalRecordsWritten = new AtomicLong(0L); + commitMetadata.getWriteStats().stream() + .filter(hoodieWriteStat -> FSUtils.isBaseFile(new StoragePath(hoodieWriteStat.getPath()))) + .forEach(hoodieWriteStat -> { + totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes()); + totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites()); + }); + if (totalBytesWritten.get() > fileSizeThreshold && totalRecordsWritten.get() > 0) { + avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) / totalRecordsWritten.get()); + break; + } + } + } catch (IOException ioe) { + // make this fail safe. + LOG.error("Error trying to compute average bytes/record ", ioe); + } + } + } + return avgSize; + } +} diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index 2b78df96765..09904cd290e 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -22,7 +22,6 @@ import org.apache.hudi.client.common.HoodieSparkEngineContext; import org.apache.hudi.common.engine.HoodieEngineContext; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; -import org.apache.hudi.common.model.HoodieCommitMetadata; import org.apache.hudi.common.model.HoodieKey; import org.apache.hudi.common.model.HoodieRecordLocation; import org.apache.hudi.common.model.HoodieWriteStat; @@ -46,7 +45,6 @@ import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; @@ -56,6 +54,8 @@ import java.util.stream.Collectors; import scala.Tuple2; import static org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION; +import static org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION; /** * Packs incoming records to be upserted, into buckets (1 bucket = 1 RDD partition). @@ -170,8 +170,9 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { * created by clustering, which has smaller average record size, which affects assigning inserts and * may result in OOM by making spark underestimate the actual input record sizes. */ - long averageRecordSize = averageBytesPerRecord(table.getMetaClient().getActiveTimeline() - .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION)).filterCompletedInstants(), config); + long averageRecordSize = AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline() + .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION)) + .filterCompletedInstants(), config); LOG.info("AvgRecordSize => " + averageRecordSize); Map<String, List<SmallFile>> partitionSmallFilesMap = @@ -228,7 +229,7 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { if (totalUnassignedInserts > 0) { long insertRecordsPerBucket = config.getCopyOnWriteInsertSplitSize(); if (config.shouldAutoTuneInsertSplits()) { - insertRecordsPerBucket = config.getParquetMaxFileSize() / averageRecordSize; + insertRecordsPerBucket = (int) Math.ceil((1.0 * config.getParquetMaxFileSize()) / averageRecordSize); } int insertBuckets = (int) Math.ceil((1.0 * totalUnassignedInserts) / insertRecordsPerBucket); @@ -366,34 +367,4 @@ public class UpsertPartitioner<T> extends SparkHoodiePartitioner<T> { return targetBuckets.get(0).getKey().bucketNumber; } } - - /** - * Obtains the average record size based on records written during previous commits. Used for estimating how many - * records pack into one file. - */ - protected static long averageBytesPerRecord(HoodieTimeline commitTimeline, HoodieWriteConfig hoodieWriteConfig) { - long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate(); - long fileSizeThreshold = (long) (hoodieWriteConfig.getRecordSizeEstimationThreshold() * hoodieWriteConfig.getParquetSmallFileLimit()); - try { - if (!commitTimeline.empty()) { - // Go over the reverse ordered commits to get a more recent estimate of average record size. - Iterator<HoodieInstant> instants = commitTimeline.getReverseOrderedInstants().iterator(); - while (instants.hasNext()) { - HoodieInstant instant = instants.next(); - HoodieCommitMetadata commitMetadata = HoodieCommitMetadata - .fromBytes(commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class); - long totalBytesWritten = commitMetadata.fetchTotalBytesWritten(); - long totalRecordsWritten = commitMetadata.fetchTotalRecordsWritten(); - if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten > 0) { - avgSize = (long) Math.ceil((1.0 * totalBytesWritten) / totalRecordsWritten); - break; - } - } - } - } catch (Throwable t) { - // make this fail safe. - LOG.error("Error trying to compute average bytes/record ", t); - } - return avgSize; - } } diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java new file mode 100644 index 00000000000..5db8c978b65 --- /dev/null +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.table.action.commit; + +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; + +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.stream.Stream; + +import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG; +import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET; +import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Test average record size estimation. + */ +public class TestAverageRecordSizeUtils { + + private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class); + private static final String PARTITION1 = "partition1"; + private static final String TEST_WRITE_TOKEN = "1-0-1"; + + @ParameterizedTest + @MethodSource("testCases") + public void testAverageRecordSize(List<Pair<HoodieInstant, List<HWriteStat>>> instantSizePairs, long expectedSize) { + HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder().withPath("/tmp") + .build(); + HoodieDefaultTimeline commitsTimeline = new HoodieDefaultTimeline(); + List<HoodieInstant> instants = new ArrayList<>(); + instantSizePairs.forEach(entry -> { + HoodieInstant hoodieInstant = entry.getKey(); + HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata(); + entry.getValue().forEach(hWriteStat -> { + HoodieWriteStat writeStat = new HoodieWriteStat(); + writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten()); + writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() * hWriteStat.getTotalRecordsWritten()); + writeStat.setPath(hWriteStat.getPath()); + commitMetadata.addWriteStat(PARTITION1, writeStat); + }); + instants.add(hoodieInstant); + try { + when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(Option.of(getUTF8Bytes(commitMetadata.toJsonString()))); + } catch (IOException e) { + throw new RuntimeException("Should not have failed", e); + } + }); + + List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants); + Collections.reverse(reverseOrderInstants); + when(mockTimeline.getInstants()).thenReturn(instants); + when(mockTimeline.getReverseOrderedInstants()).then(i -> reverseOrderInstants.stream()); + commitsTimeline.setInstants(instants); + + assertEquals(expectedSize, AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig)); + } + + private static String getBaseFileName(String instantTime) { + String fileName = UUID.randomUUID().toString(); + return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName, PARQUET.getFileExtension()); + } + + private static String getLogFileName(String instantTime) { + String fileName = UUID.randomUUID().toString(); + String fullFileName = FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName, PARQUET.getFileExtension()); + assertEquals(instantTime, FSUtils.getCommitTime(fullFileName)); + return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(), instantTime, 1, TEST_WRITE_TOKEN); + } + + static Stream<Arguments> testCases() { + Long baseInstant = 20231204194919610L; + List<Arguments> arguments = new ArrayList<>(); + // COW + // straight forward. just 1 instant. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))), 100L)); + + // two instants. latest instant should be honored + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L)); + + // two instants, while 2nd one is smaller in size so as to not meet the threshold. So, 1st one should be honored + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000L, 200L)))), 100L)); + + // 2nd instance is replace commit and should be honored. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L)); + + // MOR + // for delta commits, only parquet files should be accounted for. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L)))), 200L)); + + // delta commit has a mix of parquet and log files. only parquet files should be accounted for. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 10000000L, 300L)))), 200L)); + + // 2nd delta commit only has log files. and so we honor 1st delta commit size. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 10000000L, 300L)))), 100L)); + + // replace commit should be honored. + arguments.add(Arguments.of( + Arrays.asList(Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)), + Arrays.asList(new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L), + new HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 300L))), + Pair.of(new HoodieInstant(HoodieInstant.State.COMPLETED, HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)), + Collections.singletonList(new HWriteStat(getBaseFileName(String.valueOf(baseInstant + 200)), 1000000L, 400L)))), 400L)); + return arguments.stream(); + } + + static class HWriteStat { + private final String path; + private final Long totalRecordsWritten; + private final Long perRecordSize; + + public HWriteStat(String path, Long totalRecordsWritten, Long perRecordSize) { + this.path = path; + this.totalRecordsWritten = totalRecordsWritten; + this.perRecordSize = perRecordSize; + } + + public String getPath() { + return path; + } + + public Long getTotalRecordsWritten() { + return totalRecordsWritten; + } + + public Long getPerRecordSize() { + return perRecordSize; + } + } +} diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java index 1ca12aad5b7..12ebd7cee01 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java @@ -67,7 +67,6 @@ import static org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION import static org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat; import static org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource; import static org.apache.hudi.common.util.StringUtils.getUTF8Bytes; -import static org.apache.hudi.table.action.commit.UpsertPartitioner.averageBytesPerRecord; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -175,7 +174,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { LinkedList<Option<byte[]>> commits = generateCommitMetadataList(); when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock -> commits.pop()); long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500); - long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config); assertEquals(expectAvgSize, actualAvgSize); } @@ -185,7 +184,7 @@ public class TestUpsertPartitioner extends HoodieClientTestBase { HoodieWriteConfig config = makeHoodieClientConfigBuilder().build(); when(commitTimeLine.empty()).thenReturn(true); long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate(); - long actualAvgSize = averageBytesPerRecord(commitTimeLine, config); + long actualAvgSize = AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config); assertEquals(expectAvgSize, actualAvgSize); }
