This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 38feb9be595 [HUDI-8985] Fix avg record size estimator to avoid parsing
all commit metadata of active timeline (#12803)
38feb9be595 is described below
commit 38feb9be5950dbb2b049e5c2f26636dccfd5aeb9
Author: Davis-Zhang-Onehouse
<[email protected]>
AuthorDate: Fri Feb 14 10:30:39 2025 -0800
[HUDI-8985] Fix avg record size estimator to avoid parsing all commit
metadata of active timeline (#12803)
---
.../org/apache/hudi/config/HoodieWriteConfig.java | 51 ++++
.../hudi/estimator/AverageRecordSizeEstimator.java | 134 ++++++++++
.../apache/hudi/estimator/RecordSizeEstimator.java | 48 ++++
.../hudi/estimator/RecordSizeEstimatorFactory.java | 45 ++++
.../estimator/TestAverageRecordSizeEstimator.java | 279 +++++++++++++++++++++
.../estimator/TestRecordSizeEstimatorFactory.java | 72 ++++++
.../action/commit/AverageRecordSizeUtils.java | 91 -------
.../table/action/commit/UpsertPartitioner.java | 8 +-
.../action/commit/TestAverageRecordSizeUtils.java | 219 ----------------
.../table/action/commit/TestUpsertPartitioner.java | 92 -------
.../versioning/DefaultInstantComparator.java | 24 ++
.../hudi/common/testutils/HoodieTestUtils.java | 3 +
12 files changed, 662 insertions(+), 404 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index e57e9b29a83..e420a6e7ad0 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -67,6 +67,7 @@ import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.config.metrics.HoodieMetricsGraphiteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsJmxConfig;
import org.apache.hudi.config.metrics.HoodieMetricsM3Config;
+import org.apache.hudi.estimator.AverageRecordSizeEstimator;
import org.apache.hudi.exception.HoodieNotSupportedException;
import org.apache.hudi.execution.bulkinsert.BulkInsertSortMode;
import org.apache.hudi.index.HoodieIndex;
@@ -211,6 +212,29 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("Key generator class, that implements
`org.apache.hudi.keygen.KeyGenerator` "
+ "extract a key out of incoming records.");
+ public static final ConfigProperty<String> RECORD_SIZE_ESTIMATOR_CLASS_NAME
= ConfigProperty
+ .key("hoodie.record.size.estimator.class")
+ .defaultValue(AverageRecordSizeEstimator.class.getName())
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("Class that estimates the size of records written by
implementing "
+ + "`org.apache.hudi.estimator.RecordSizeEstimator`. Default
implementation is `org.apache.hudi.estimator.AverageRecordSizeEstimator`");
+
+ public static final ConfigProperty<Integer>
RECORD_SIZE_ESTIMATOR_MAX_COMMITS = ConfigProperty
+ .key("_hoodie.record.size.estimator.max.commits")
+ .defaultValue(5)
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("The maximum number of commits that will be read to
estimate the avg record size. "
+ + "This makes sure we parse a limited number of commit metadata, as
parsing the entire active timeline can be expensive and unnecessary.");
+
+ public static final ConfigProperty<String>
RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE = ConfigProperty
+ .key("hoodie.record.size.estimator.average.metadata.size")
+ .defaultValue("0")
+ .markAdvanced()
+ .sinceVersion("1.0.0")
+ .withDocumentation("The approximate metadata size in bytes to subtract
from the file size when estimating the record size.");
+
public static final ConfigProperty<String> WRITE_EXECUTOR_TYPE =
ConfigProperty
.key("hoodie.write.executor.type")
.defaultValue(ExecutorType.SIMPLE.name())
@@ -1649,6 +1673,18 @@ public class HoodieWriteConfig extends HoodieConfig {
return getInt(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE);
}
+ public String getRecordSizeEstimator() {
+ return getStringOrDefault(RECORD_SIZE_ESTIMATOR_CLASS_NAME);
+ }
+
+ public int getRecordSizeEstimatorMaxCommits() {
+ return getIntOrDefault(RECORD_SIZE_ESTIMATOR_MAX_COMMITS);
+ }
+
+ public long getRecordSizeEstimatorAverageMetadataSize() {
+ return
Long.parseLong(getStringOrDefault(RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE));
+ }
+
public boolean allowMultipleCleans() {
return getBoolean(HoodieCleanConfig.ALLOW_MULTIPLE_CLEANS);
}
@@ -3016,6 +3052,21 @@ public class HoodieWriteConfig extends HoodieConfig {
return this;
}
+ public Builder withRecordSizeEstimator(String recordSizeEstimator) {
+ writeConfig.setValue(RECORD_SIZE_ESTIMATOR_CLASS_NAME,
recordSizeEstimator);
+ return this;
+ }
+
+ public Builder withRecordSizeEstimatorMaxCommits(int maxCommits) {
+ writeConfig.setValue(RECORD_SIZE_ESTIMATOR_MAX_COMMITS,
String.valueOf(maxCommits));
+ return this;
+ }
+
+ public Builder withRecordSizeEstimatorAverageMetadataSize(long
avgMetadataSize) {
+ writeConfig.setValue(RECORD_SIZE_ESTIMATOR_AVERAGE_METADATA_SIZE,
String.valueOf(avgMetadataSize));
+ return this;
+ }
+
public Builder withExecutorType(String executorClass) {
writeConfig.setValue(WRITE_EXECUTOR_TYPE, executorClass);
return this;
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
new file mode 100644
index 00000000000..ec43ca76526
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/AverageRecordSizeEstimator.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.data.HoodieAtomicLongAccumulator;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.CollectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.storage.StoragePath;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Set;
+import java.util.stream.Stream;
+
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+
+/**
+ * Default Implementation for the {@link RecordSizeEstimator}.
+ * Estimates the average record sizes based on the stats from the previous X
commits and deltacommits.
+ * X is configured using hoodieWriteConfig.getRecordSizeEstimateMaxCommits().
+ * <p>
+ * Currently, we will estimate the avg record sizes only from candidate files
from the commit metadata. Candidate
+ * files are selective files that have a threshold size to avoid measurement
errors. Optionally, we can
+ * configure the expected metadata size of the file so that can be accounted
for.
+ */
+public class AverageRecordSizeEstimator extends RecordSizeEstimator {
+ private static final Logger LOG =
LoggerFactory.getLogger(AverageRecordSizeEstimator.class);
+ /*
+ * NOTE: we only use commit instants to calculate average record size
because replacecommit can be
+ * created by clustering, which has smaller average record size, which
affects assigning inserts and
+ * may result in OOM by making spark underestimate the actual input record
sizes.
+ */
+ private static final Set<String> RECORD_SIZE_ESTIMATE_ACTIONS =
CollectionUtils.createSet(COMMIT_ACTION, DELTA_COMMIT_ACTION,
COMPACTION_ACTION);
+
+ public AverageRecordSizeEstimator(HoodieWriteConfig writeConfig) {
+ super(writeConfig);
+ }
+
+ @Override
+ public long averageBytesPerRecord(HoodieTimeline commitTimeline,
CommitMetadataSerDe commitMetadataSerDe) {
+ int maxCommits = hoodieWriteConfig.getRecordSizeEstimatorMaxCommits();
+ final AverageRecordSizeStats averageRecordSizeStats = new
AverageRecordSizeStats(hoodieWriteConfig);
+ try {
+ if (!commitTimeline.empty()) {
+ // Go over the reverse ordered commits to get a more recent estimate
of average record size.
+ Stream<HoodieInstant> filteredInstants =
commitTimeline.filterCompletedInstants()
+ .getReverseOrderedInstants()
+ .filter(s -> RECORD_SIZE_ESTIMATE_ACTIONS.contains(s.getAction()))
+ .limit(maxCommits);
+ filteredInstants
+ .forEach(instant -> {
+ HoodieCommitMetadata commitMetadata;
+ try {
+ commitMetadata = commitMetadataSerDe
+ .deserialize(instant,
commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
+ if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
+ // let's consider only base files in case of delta commits
+ commitMetadata.getWriteStats().stream().parallel()
+ .filter(hoodieWriteStat -> FSUtils.isBaseFile(new
StoragePath(hoodieWriteStat.getPath())))
+ .forEach(hoodieWriteStat ->
averageRecordSizeStats.updateStats(hoodieWriteStat.getTotalWriteBytes(),
hoodieWriteStat.getNumWrites()));
+ } else {
+
averageRecordSizeStats.updateStats(commitMetadata.fetchTotalBytesWritten(),
commitMetadata.fetchTotalRecordsWritten());
+ }
+ } catch (IOException ignore) {
+ LOG.info("Failed to parse commit metadata", ignore);
+ }
+ });
+ }
+ } catch (Throwable t) {
+ LOG.warn("Got error while trying to compute average bytes/record but
will proceed to use the computed value "
+ + "or fallback to default config value ", t);
+ }
+ return averageRecordSizeStats.computeAverageRecordSize();
+ }
+
+ private static class AverageRecordSizeStats implements Serializable {
+ private final HoodieAtomicLongAccumulator totalBytesWritten;
+ private final HoodieAtomicLongAccumulator totalRecordsWritten;
+ private final long fileSizeThreshold;
+ private final long avgMetadataSize;
+ private final int defaultRecordSize;
+
+ public AverageRecordSizeStats(HoodieWriteConfig hoodieWriteConfig) {
+ totalBytesWritten = HoodieAtomicLongAccumulator.create();
+ totalRecordsWritten = HoodieAtomicLongAccumulator.create();
+ fileSizeThreshold = (long)
(hoodieWriteConfig.getRecordSizeEstimationThreshold() *
hoodieWriteConfig.getParquetSmallFileLimit());
+ avgMetadataSize =
hoodieWriteConfig.getRecordSizeEstimatorAverageMetadataSize();
+ defaultRecordSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
+ }
+
+ private void updateStats(long fileSizeInBytes, long recordWritten) {
+ if (fileSizeInBytes > fileSizeThreshold && fileSizeInBytes >
avgMetadataSize && recordWritten > 0) {
+ totalBytesWritten.add(fileSizeInBytes - avgMetadataSize);
+ totalRecordsWritten.add(recordWritten);
+ }
+ }
+
+ private long computeAverageRecordSize() {
+ if (totalBytesWritten.value() > 0 && totalRecordsWritten.value() > 0) {
+ return totalBytesWritten.value() / totalRecordsWritten.value();
+ }
+ // Fallback to default implementation in the cases were we either got an
exception before we could
+ // compute the average record size or there are no eligible commits yet.
+ return defaultRecordSize;
+ }
+ }
+}
\ No newline at end of file
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
new file mode 100644
index 00000000000..c9fc26a30cd
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimator.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.PublicAPIMethod;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import java.io.Serializable;
+
+/**
+ * Abstract class to compute the estimated record size used for assigning
records to spark partitions
+ * for file writing.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public abstract class RecordSizeEstimator implements Serializable {
+
+ protected final HoodieWriteConfig hoodieWriteConfig;
+
+ protected RecordSizeEstimator(HoodieWriteConfig config) {
+ this.hoodieWriteConfig = config;
+ }
+
+ /**
+ * Generate a Hoodie Key out of provided generic record.
+ */
+ @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
+ public abstract long averageBytesPerRecord(HoodieTimeline commitTimeline,
CommitMetadataSerDe commitMetadataSerDe);
+}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
new file mode 100644
index 00000000000..6fd99e8ca25
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/estimator/RecordSizeEstimatorFactory.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Factory class to instantiate the configured implementation of {@link
RecordSizeEstimator}.
+ * Since record estimation is best effort, we use the {@link
AverageRecordSizeEstimator} (default implementation)
+ * as a backup in case of fatal exceptions.
+ */
+public class RecordSizeEstimatorFactory {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RecordSizeEstimatorFactory.class);
+
+ public static RecordSizeEstimator
createRecordSizeEstimator(HoodieWriteConfig writeConfig) {
+ String recordSizeEstimatorClass = writeConfig.getRecordSizeEstimator();
+ try {
+ return (RecordSizeEstimator)
ReflectionUtils.loadClass(recordSizeEstimatorClass, writeConfig);
+ } catch (Throwable e) {
+ LOG.warn("Unable to instantiate the record estimator implementation {}.
Falling back to use default AverageRecordSizeEstimator.\" ",
recordSizeEstimatorClass, e);
+ }
+ return new AverageRecordSizeEstimator(writeConfig);
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
new file mode 100644
index 00000000000..5b7610ecdd3
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestAverageRecordSizeEstimator.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+import java.util.stream.Stream;
+
+import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_COMPARATOR;
+import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
+import static
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+/**
+ * Test average record size estimation.
+ */
+public class TestAverageRecordSizeEstimator {
+ private static final String BASE_FILE_EXTENSION =
+ HoodieTableConfig.BASE_FILE_FORMAT.defaultValue().getFileExtension();
+ private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
+ private final CommitMetadataSerDe mockCommitMetadataSerDe =
mock(CommitMetadataSerDe.class);
+ private static final String PARTITION1 = "partition1";
+ private static final String TEST_WRITE_TOKEN = "1-0-1";
+ private static final Integer DEFAULT_MAX_COMMITS = 2;
+ private static final Integer DEFAULT_MAX_PARQUET_METADATA_SIZE = 1000;
+ private static final Double DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD = 0.1;
+
+ @Test
+ public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws
Exception {
+ HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
+ HoodieWriteConfig config =
HoodieWriteConfig.newBuilder().withPath("/tmp").build();
+ when(commitTimeLine.empty()).thenReturn(true);
+ long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
+ AverageRecordSizeEstimator averageRecordSizeEstimator = new
AverageRecordSizeEstimator(config);
+ long actualAvgSize =
averageRecordSizeEstimator.averageBytesPerRecord(commitTimeLine,
COMMIT_METADATA_SER_DE);
+ assertEquals(expectAvgSize, actualAvgSize);
+ }
+
+ @Test
+ public void testErrorHandling() {
+ int recordSize = 10000;
+ HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
+
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(recordSize)))
+ .build(false);
+ BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
+ List<HoodieInstant> instants = Collections.singletonList(
+ INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "1"));
+
+ when(mockTimeline.getInstants()).thenReturn(instants);
+ when(mockTimeline.getReverseOrderedInstants()).then(i ->
instants.stream());
+ // Simulate a case where the instant details are absent
+ commitsTimeline.setInstants(new ArrayList<>());
+ AverageRecordSizeEstimator averageRecordSizeEstimator = new
AverageRecordSizeEstimator(writeConfig);
+ long actualAvgSize =
averageRecordSizeEstimator.averageBytesPerRecord(mockTimeline,
COMMIT_METADATA_SER_DE);
+ assertEquals(recordSize, actualAvgSize);
+ }
+
+ @ParameterizedTest
+ @MethodSource("testCases")
+ public void
testAverageRecordSizeWithNonEmptyCommitTimeline(List<Pair<HoodieInstant,
List<HWriteStat>>> instantSizePairs, long expectedSize) {
+ HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp")
+ .withRecordSizeEstimator(AverageRecordSizeEstimator.class.getName())
+ .withRecordSizeEstimatorMaxCommits(DEFAULT_MAX_COMMITS)
+
.withRecordSizeEstimatorAverageMetadataSize(DEFAULT_MAX_PARQUET_METADATA_SIZE)
+ .withCompactionConfig(HoodieCompactionConfig.newBuilder()
+
.compactionRecordSizeEstimateThreshold(DEFAULT_RECORD_SIZE_ESTIMATE_THRESHOLD)
+ .build())
+ .build();
+
+ List<HoodieInstant> instants = new ArrayList<>();
+ instantSizePairs.forEach(entry -> {
+ HoodieInstant hoodieInstant = entry.getKey();
+ HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
+ entry.getValue().forEach(hWriteStat -> {
+ HoodieWriteStat writeStat = new HoodieWriteStat();
+ writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
+ writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() *
hWriteStat.getTotalRecordsWritten());
+ writeStat.setPath(hWriteStat.getPath());
+ commitMetadata.addWriteStat(PARTITION1, writeStat);
+ });
+ instants.add(hoodieInstant);
+ try {
+ when(mockTimeline.getInstantDetails(hoodieInstant))
+
.thenReturn(org.apache.hudi.common.util.Option.of(commitMetadata.toJsonString().getBytes(StandardCharsets.UTF_8)));
+ when(mockCommitMetadataSerDe.deserialize(hoodieInstant,
mockTimeline.getInstantDetails(hoodieInstant).get(),
HoodieCommitMetadata.class))
+ .thenReturn(commitMetadata);
+ } catch (IOException e) {
+ throw new RuntimeException("Should not have failed", e);
+ }
+ });
+
+ List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
+ Collections.reverse(reverseOrderInstants);
+
+ when(mockTimeline.filterCompletedInstants()).thenReturn(mockTimeline);
+ when(mockTimeline.getReverseOrderedInstants()).then(i ->
reverseOrderInstants.stream());
+
+ AverageRecordSizeEstimator averageRecordSizeEstimator = new
AverageRecordSizeEstimator(writeConfig);
+ long actualSize =
averageRecordSizeEstimator.averageBytesPerRecord(mockTimeline,
mockCommitMetadataSerDe);
+ assertEquals(expectedSize, actualSize);
+ }
+
+ private static String getBaseFileName(String instantTime) {
+ String fileName = UUID.randomUUID().toString();
+ return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName,
BASE_FILE_EXTENSION);
+ }
+
+ private static String getLogFileName(String instantTime) {
+ String fileName = UUID.randomUUID().toString();
+ String fullFileName = FSUtils.makeBaseFileName(instantTime,
TEST_WRITE_TOKEN, fileName, BASE_FILE_EXTENSION);
+ assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
+ return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(),
instantTime, 1, TEST_WRITE_TOKEN);
+ }
+
+ private static Stream<Arguments> testCases() {
+ Long baseInstant = 20231204194919610L;
+ List<Arguments> arguments = new ArrayList<>();
+ // Note the avg record estimate is based on a parquet metadata size of
500Bytes per file.
+ // 1. straight forward. just 1 instant.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 100L)))), 99L));
+
+ // 2. two instants. avg of both the instants
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 100L))),
+ Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 1000000L, 200L)))), 109L));
+
+ // 3. two instants, latest commit has a small file thats just above
threshold, while earliest commit is fully ignored,
+ // since it below the threshold size limit
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
9000L, 1000L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 110000, 100L)))), 99L));
+
+ // 4. 2nd instance is replace commit, it shld be excluded and should be
avg of both commits.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 100L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 10000000L, 200L)))), 99L));
+
+ // 5. for delta commits, only parquet files should be accounted for.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 100L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 10000000L, 200L)))), 149L));
+
+ // 6. delta commit has a mix of parquet and log files. only parquet files
should be accounted for.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
1000000L, 100L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Arrays.asList(generateBaseWriteStat(baseInstant + 100,
10000000L, 200L),
+ generateLogWriteStat(baseInstant + 100, 10000000L,
300L)))), 190L));
+
+ // 7. 2nd delta commit only has log files. and so we honor 1st delta
commit size.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 100L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Arrays.asList(generateLogWriteStat(baseInstant + 100,
1000000L, 200L),
+ generateLogWriteStat(baseInstant + 100, 10000000L,
300L)))), 99L));
+
+ // 8. since default max commits is overriden to 2 commits, ignore the
earliest commit here since there are total 3 commits
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 1000L))),
+ Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 10000000L, 50L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 200)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
200, 10000000L, 100L)))), 74L));
+
+ // 9. replace commits should be ignored despite being the latest commits.
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
1000000L, 100L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Arrays.asList(generateLogWriteStat(baseInstant + 100,
1000000L, 200L),
+ generateLogWriteStat(baseInstant + 100, 1000000L, 300L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
200, 1000000L, 2000L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.REPLACE_COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
300, 1000000L, 3000L)))), 99L));
+
+ // 10. Ignore commit stat with 0 records
+ arguments.add(Arguments.of(
+
Arrays.asList(Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant,
10000000L, 1000L))),
+ Pair.of(generateCompletedInstant(HoodieTimeline.COMMIT_ACTION,
Long.toString(baseInstant + 100)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
100, 10000000L, 50L))),
+
Pair.of(generateCompletedInstant(HoodieTimeline.DELTA_COMMIT_ACTION,
Long.toString(baseInstant + 200)),
+ Collections.singletonList(generateBaseWriteStat(baseInstant +
200, 0L, 1000L)))), 49L));
+
+ return arguments.stream();
+ }
+
+ private static HoodieInstant generateCompletedInstant(String action, String
instant) {
+ return new HoodieInstant(HoodieInstant.State.COMPLETED, action, instant,
INSTANT_COMPARATOR.requestedTimeOrderedComparator());
+ }
+
+ private static HWriteStat generateBaseWriteStat(long instant, long
totalRecordsWritten, long perRecordSize) {
+ return new HWriteStat(getBaseFileName(String.valueOf(instant)),
totalRecordsWritten, perRecordSize);
+ }
+
+ private static HWriteStat generateLogWriteStat(long instant, long
totalRecordsWritten, long perRecordSize) {
+ return new HWriteStat(getLogFileName(String.valueOf(instant)),
totalRecordsWritten, perRecordSize);
+ }
+
+ static class HWriteStat {
+ private final String path;
+ private final Long totalRecordsWritten;
+ private final Long perRecordSize;
+
+ public HWriteStat(String path, Long totalRecordsWritten, Long
perRecordSize) {
+ this.path = path;
+ this.totalRecordsWritten = totalRecordsWritten;
+ this.perRecordSize = perRecordSize;
+ }
+
+ public String getPath() {
+ return path;
+ }
+
+ public Long getTotalRecordsWritten() {
+ return totalRecordsWritten;
+ }
+
+ public Long getPerRecordSize() {
+ return perRecordSize;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
new file mode 100644
index 00000000000..283c9f306ce
--- /dev/null
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/estimator/TestRecordSizeEstimatorFactory.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.estimator;
+
+import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.parquet.Strings;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class TestRecordSizeEstimatorFactory {
+
+ @ParameterizedTest
+ @MethodSource("differentEstimatorImplementations")
+ public void testRecordSizeEstimatorFactoryWithCustomConfig(String className,
Class<?> clazz) {
+ HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+ .withPath("/tmp");
+ if (!Strings.isNullOrEmpty(className)) {
+ builder.withRecordSizeEstimator(className);
+ }
+
+ RecordSizeEstimator estimator =
RecordSizeEstimatorFactory.createRecordSizeEstimator(builder.build());
+ assertTrue(clazz.isInstance(estimator));
+ }
+
+ static Stream<Arguments> differentEstimatorImplementations() {
+ List<Arguments> arguments = new ArrayList<>();
+ // Explicitly configured
+ arguments.add(Arguments.of(TestRecordSizeEstimator.class.getName(),
TestRecordSizeEstimator.class));
+ // Not configured
+ arguments.add(Arguments.of(null, AverageRecordSizeEstimator.class));
+ // Incorrectly configured
+
arguments.add(Arguments.of("org.apache.hudi.estimator.IncorrectRecordEstimator",
AverageRecordSizeEstimator.class));
+ return arguments.stream();
+ }
+
+ public static class TestRecordSizeEstimator extends RecordSizeEstimator {
+ public TestRecordSizeEstimator(HoodieWriteConfig config) {
+ super(config);
+ }
+
+ @Override
+ public long averageBytesPerRecord(HoodieTimeline commitTimeline,
CommitMetadataSerDe commitMetadataSerDe) {
+ return 0;
+ }
+ }
+}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
deleted file mode 100644
index 8873fb01c97..00000000000
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/AverageRecordSizeUtils.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.storage.StoragePath;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicLong;
-
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
-import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
-
-/**
- * Util class to assist with fetching average record size.
- */
-public class AverageRecordSizeUtils {
- private static final Logger LOG =
LoggerFactory.getLogger(AverageRecordSizeUtils.class);
-
- /**
- * Obtains the average record size based on records written during previous
commits. Used for estimating how many
- * records pack into one file.
- */
- static long averageBytesPerRecord(HoodieTimeline commitTimeline,
HoodieWriteConfig hoodieWriteConfig, CommitMetadataSerDe commitMetadataSerDe) {
- long avgSize = hoodieWriteConfig.getCopyOnWriteRecordSizeEstimate();
- long fileSizeThreshold = (long)
(hoodieWriteConfig.getRecordSizeEstimationThreshold() *
hoodieWriteConfig.getParquetSmallFileLimit());
- if (!commitTimeline.empty()) {
- // Go over the reverse ordered commits to get a more recent estimate of
average record size.
- Iterator<HoodieInstant> instants =
commitTimeline.getReverseOrderedInstants().iterator();
- while (instants.hasNext()) {
- HoodieInstant instant = instants.next();
- try {
- HoodieCommitMetadata commitMetadata = commitMetadataSerDe
- .deserialize(instant,
commitTimeline.getInstantDetails(instant).get(), HoodieCommitMetadata.class);
- if (instant.getAction().equals(COMMIT_ACTION) ||
instant.getAction().equals(REPLACE_COMMIT_ACTION)) {
- long totalBytesWritten = commitMetadata.fetchTotalBytesWritten();
- long totalRecordsWritten =
commitMetadata.fetchTotalRecordsWritten();
- if (totalBytesWritten > fileSizeThreshold && totalRecordsWritten >
0) {
- avgSize = (long) Math.ceil((1.0 * totalBytesWritten) /
totalRecordsWritten);
- break;
- }
- } else if (instant.getAction().equals(DELTA_COMMIT_ACTION)) {
- // lets consider only base files in case of delta commits
- AtomicLong totalBytesWritten = new AtomicLong(0L);
- AtomicLong totalRecordsWritten = new AtomicLong(0L);
- commitMetadata.getWriteStats().stream()
- .filter(hoodieWriteStat -> FSUtils.isBaseFile(new
StoragePath(hoodieWriteStat.getPath())))
- .forEach(hoodieWriteStat -> {
-
totalBytesWritten.addAndGet(hoodieWriteStat.getTotalWriteBytes());
-
totalRecordsWritten.addAndGet(hoodieWriteStat.getNumWrites());
- });
- if (totalBytesWritten.get() > fileSizeThreshold &&
totalRecordsWritten.get() > 0) {
- avgSize = (long) Math.ceil((1.0 * totalBytesWritten.get()) /
totalRecordsWritten.get());
- break;
- }
- }
- } catch (Throwable t) {
- // make this fail safe.
- LOG.error("Error trying to compute average bytes/record ", t);
- }
- }
- }
- return avgSize;
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
index 146a01d0886..4e2c6bbb49b 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java
@@ -33,6 +33,8 @@ import org.apache.hudi.common.util.NumericUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.estimator.RecordSizeEstimator;
+import org.apache.hudi.estimator.RecordSizeEstimatorFactory;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
@@ -85,6 +87,7 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
protected final HoodieWriteConfig config;
private final WriteOperationType operationType;
+ private final RecordSizeEstimator recordSizeEstimator;
public UpsertPartitioner(WorkloadProfile profile, HoodieEngineContext
context, HoodieTable table,
HoodieWriteConfig config, WriteOperationType
operationType) {
@@ -94,6 +97,7 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
bucketInfoMap = new HashMap<>();
this.config = config;
this.operationType = operationType;
+ this.recordSizeEstimator =
RecordSizeEstimatorFactory.createRecordSizeEstimator(config);
assignUpdates(profile);
long totalInserts =
profile.getInputPartitionPathStatMap().values().stream().mapToLong(stat ->
stat.getNumInserts()).sum();
if (!WriteOperationType.isPreppedWriteOperation(operationType) ||
totalInserts > 0) { // skip if its prepped write operation. or if totalInserts
= 0.
@@ -174,9 +178,9 @@ public class UpsertPartitioner<T> extends
SparkHoodiePartitioner<T> {
* may result in OOM by making spark underestimate the actual input record
sizes.
*/
TimelineLayout layout =
TimelineLayout.fromVersion(table.getActiveTimeline().getTimelineLayoutVersion());
- long averageRecordSize =
AverageRecordSizeUtils.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
+ long averageRecordSize =
recordSizeEstimator.averageBytesPerRecord(table.getMetaClient().getActiveTimeline()
.getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION))
- .filterCompletedInstants(), config, layout.getCommitMetadataSerDe());
+ .filterCompletedInstants(), layout.getCommitMetadataSerDe());
LOG.info("AvgRecordSize => " + averageRecordSize);
Map<String, List<SmallFile>> partitionSmallFilesMap =
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
deleted file mode 100644
index 971518a685f..00000000000
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestAverageRecordSizeUtils.java
+++ /dev/null
@@ -1,219 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.hudi.table.action.commit;
-
-import org.apache.hudi.common.fs.FSUtils;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
-import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.TimelineMetadataUtils;
-import org.apache.hudi.common.table.timeline.versioning.v2.BaseTimelineV2;
-import org.apache.hudi.common.testutils.HoodieTestUtils;
-import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieWriteConfig;
-
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.UUID;
-import java.util.stream.Stream;
-
-import static org.apache.hudi.common.model.HoodieFileFormat.HOODIE_LOG;
-import static org.apache.hudi.common.model.HoodieFileFormat.PARQUET;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
-import static
org.apache.hudi.config.HoodieCompactionConfig.COPY_ON_WRITE_RECORD_SIZE_ESTIMATE;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * Test average record size estimation.
- */
-public class TestAverageRecordSizeUtils {
-
- private final HoodieTimeline mockTimeline = mock(HoodieTimeline.class);
- private static final String PARTITION1 = "partition1";
- private static final String TEST_WRITE_TOKEN = "1-0-1";
-
- @ParameterizedTest
- @MethodSource("testCases")
- public void testAverageRecordSize(List<Pair<HoodieInstant,
List<HWriteStat>>> instantSizePairs, long expectedSize) throws IOException {
- HoodieWriteConfig writeConfig =
HoodieWriteConfig.newBuilder().withPath("/tmp")
- .build();
- HoodieTableMetaClient metaClient = HoodieTestUtils.init("/tmp");
- HoodieTimeline commitsTimeline = new BaseTimelineV2();
- List<HoodieInstant> instants = new ArrayList<>();
- instantSizePairs.forEach(entry -> {
- HoodieInstant hoodieInstant = entry.getKey();
- HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- entry.getValue().forEach(hWriteStat -> {
- HoodieWriteStat writeStat = new HoodieWriteStat();
- writeStat.setNumWrites(hWriteStat.getTotalRecordsWritten());
- writeStat.setTotalWriteBytes(hWriteStat.getPerRecordSize() *
hWriteStat.getTotalRecordsWritten());
- writeStat.setPath(hWriteStat.getPath());
- commitMetadata.addWriteStat(PARTITION1, writeStat);
- });
- instants.add(hoodieInstant);
- try {
-
when(mockTimeline.getInstantDetails(hoodieInstant)).thenReturn(TimelineMetadataUtils.serializeCommitMetadata(COMMIT_METADATA_SER_DE,
commitMetadata));
- } catch (IOException e) {
- throw new RuntimeException("Should not have failed", e);
- }
- });
-
- List<HoodieInstant> reverseOrderInstants = new ArrayList<>(instants);
- Collections.reverse(reverseOrderInstants);
- when(mockTimeline.getInstants()).thenReturn(instants);
- when(mockTimeline.getReverseOrderedInstants()).then(i ->
reverseOrderInstants.stream());
- commitsTimeline.setInstants(instants);
-
- assertEquals(expectedSize,
AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig,
COMMIT_METADATA_SER_DE));
- }
-
- @Test
- public void testErrorHandling() throws IOException {
- int recordSize = 10000;
- HoodieWriteConfig writeConfig = HoodieWriteConfig.newBuilder()
-
.withProps(Collections.singletonMap(COPY_ON_WRITE_RECORD_SIZE_ESTIMATE.key(),
String.valueOf(recordSize)))
- .build(false);
- BaseTimelineV2 commitsTimeline = new BaseTimelineV2();
- List<HoodieInstant> instants = Collections.singletonList(
- INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "1"));
-
- when(mockTimeline.getInstants()).thenReturn(instants);
- when(mockTimeline.getReverseOrderedInstants()).then(i ->
instants.stream());
- // Simulate a case where the instant details are absent
- commitsTimeline.setInstants(new ArrayList<>());
-
- assertEquals(recordSize,
AverageRecordSizeUtils.averageBytesPerRecord(mockTimeline, writeConfig,
COMMIT_METADATA_SER_DE));
- }
-
- private static String getBaseFileName(String instantTime) {
- String fileName = UUID.randomUUID().toString();
- return FSUtils.makeBaseFileName(instantTime, TEST_WRITE_TOKEN, fileName,
PARQUET.getFileExtension());
- }
-
- private static String getLogFileName(String instantTime) {
- String fileName = UUID.randomUUID().toString();
- String fullFileName = FSUtils.makeBaseFileName(instantTime,
TEST_WRITE_TOKEN, fileName, PARQUET.getFileExtension());
- assertEquals(instantTime, FSUtils.getCommitTime(fullFileName));
- return FSUtils.makeLogFileName(fileName, HOODIE_LOG.getFileExtension(),
instantTime, 1, TEST_WRITE_TOKEN);
- }
-
- static Stream<Arguments> testCases() {
- Long baseInstant = 20231204194919610L;
- List<Arguments> arguments = new ArrayList<>();
- // COW
- // straight forward. just 1 instant.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L)))),
100L));
-
- // two instants. latest instant should be honored
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L,
200L)))), 200L));
-
- // two instants, while 2nd one is smaller in size so as to not meet the
threshold. So, 1st one should be honored
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000L,
200L)))), 100L));
-
- // 2nd instance is replace commit and should be honored.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L,
200L)))), 200L));
-
- // MOR
- // for delta commits, only parquet files should be accounted for.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L,
200L)))), 200L));
-
- // delta commit has a mix of parquet and log files. only parquet files
should be accounted for.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Arrays.asList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 100)), 10000000L, 200L),
- new HWriteStat(getLogFileName(String.valueOf(baseInstant +
100)), 10000000L, 300L)))), 200L));
-
- // 2nd delta commit only has log files. and so we honor 1st delta commit
size.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 10000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Arrays.asList(new
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
- new HWriteStat(getLogFileName(String.valueOf(baseInstant +
100)), 10000000L, 300L)))), 100L));
-
- // replace commit should be honored.
- arguments.add(Arguments.of(
-
Arrays.asList(Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant)), 1000000L, 100L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.DELTA_COMMIT_ACTION, Long.toString(baseInstant + 100)),
- Arrays.asList(new
HWriteStat(getLogFileName(String.valueOf(baseInstant + 100)), 1000000L, 200L),
- new HWriteStat(getLogFileName(String.valueOf(baseInstant +
100)), 1000000L, 300L))),
-
Pair.of(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.REPLACE_COMMIT_ACTION, Long.toString(baseInstant)),
- Collections.singletonList(new
HWriteStat(getBaseFileName(String.valueOf(baseInstant + 200)), 1000000L,
400L)))), 400L));
- return arguments.stream();
- }
-
- static class HWriteStat {
- private final String path;
- private final Long totalRecordsWritten;
- private final Long perRecordSize;
-
- public HWriteStat(String path, Long totalRecordsWritten, Long
perRecordSize) {
- this.path = path;
- this.totalRecordsWritten = totalRecordsWritten;
- this.perRecordSize = perRecordSize;
- }
-
- public String getPath() {
- return path;
- }
-
- public Long getTotalRecordsWritten() {
- return totalRecordsWritten;
- }
-
- public Long getPerRecordSize() {
- return perRecordSize;
- }
- }
-}
diff --git
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
index 8995917621e..6c28fb66dcf 100644
---
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
+++
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/commit/TestUpsertPartitioner.java
@@ -22,14 +22,10 @@ import org.apache.hudi.avro.model.HoodieClusteringPlan;
import org.apache.hudi.avro.model.HoodieCompactionPlan;
import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata;
import org.apache.hudi.common.config.HoodieStorageConfig;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
-import org.apache.hudi.common.model.HoodieWriteStat;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.testutils.CompactionTestUtils;
import org.apache.hudi.common.testutils.FileCreateUtilsLegacy;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
@@ -52,28 +48,18 @@ import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import scala.Tuple2;
-import static
org.apache.hudi.common.table.timeline.TimelineMetadataUtils.serializeCommitMetadata;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.COMMIT_METADATA_SER_DE;
import static
org.apache.hudi.common.testutils.HoodieTestUtils.DEFAULT_PARTITION_PATHS;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.INSTANT_GENERATOR;
-import static
org.apache.hudi.common.testutils.HoodieTestUtils.generateFakeHoodieWriteStat;
import static
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class TestUpsertPartitioner extends HoodieClientTestBase {
@@ -112,84 +98,6 @@ public class TestUpsertPartitioner extends
HoodieClientTestBase {
return partitioner;
}
- private static List<HoodieInstant> setupHoodieInstants() {
- List<HoodieInstant> instants = new ArrayList<>();
-
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "ts1"));
-
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "ts2"));
-
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "ts3"));
-
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "ts4"));
-
instants.add(INSTANT_GENERATOR.createNewInstant(HoodieInstant.State.COMPLETED,
HoodieTimeline.COMMIT_ACTION, "ts5"));
- Collections.reverse(instants);
- return instants;
- }
-
- private static List<HoodieWriteStat> generateCommitStatWith(int
totalRecordsWritten, int totalBytesWritten) {
- List<HoodieWriteStat> writeStatsList = generateFakeHoodieWriteStat(5);
- // clear all record and byte stats except for last entry.
- for (int i = 0; i < writeStatsList.size() - 1; i++) {
- HoodieWriteStat writeStat = writeStatsList.get(i);
- writeStat.setNumWrites(0);
- writeStat.setTotalWriteBytes(0);
- }
- HoodieWriteStat lastWriteStat = writeStatsList.get(writeStatsList.size() -
1);
- lastWriteStat.setTotalWriteBytes(totalBytesWritten);
- lastWriteStat.setNumWrites(totalRecordsWritten);
- return writeStatsList;
- }
-
- private static HoodieCommitMetadata generateCommitMetadataWith(int
totalRecordsWritten, int totalBytesWritten) {
- List<HoodieWriteStat> fakeHoodieWriteStats =
generateCommitStatWith(totalRecordsWritten, totalBytesWritten);
- HoodieCommitMetadata commitMetadata = new HoodieCommitMetadata();
- fakeHoodieWriteStats.forEach(stat ->
commitMetadata.addWriteStat(stat.getPartitionPath(), stat));
- return commitMetadata;
- }
-
- /*
- * This needs to be a stack so we test all cases when either/both
recordsWritten ,bytesWritten is zero before a non
- * zero averageRecordSize can be computed.
- */
- private static LinkedList<Option<byte[]>> generateCommitMetadataList()
throws IOException {
- LinkedList<Option<byte[]>> commits = new LinkedList<>();
- // First commit with non zero records and bytes
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(2000, 10000)));
- // Second commit with non zero records and bytes
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(1500, 7500)));
- // Third commit with a small file
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(100, 500)));
- // Fourth commit with both zero records and zero bytes
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(0, 0)));
- // Fifth commit with zero records
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(0, 1500)));
- // Sixth commit with zero bytes
- commits.push(serializeCommitMetadata(COMMIT_METADATA_SER_DE,
generateCommitMetadataWith(2500, 0)));
- return commits;
- }
-
- @Test
- public void testAverageBytesPerRecordForNonEmptyCommitTimeLine() throws
Exception {
- HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
- HoodieWriteConfig config = makeHoodieClientConfigBuilder()
-
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1000).build())
- .build();
- when(commitTimeLine.empty()).thenReturn(false);
-
when(commitTimeLine.getReverseOrderedInstants()).thenReturn(setupHoodieInstants().stream());
- LinkedList<Option<byte[]>> commits = generateCommitMetadataList();
-
when(commitTimeLine.getInstantDetails(any(HoodieInstant.class))).thenAnswer(invocationOnMock
-> commits.pop());
- long expectAvgSize = (long) Math.ceil((1.0 * 7500) / 1500);
- long actualAvgSize =
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config,
COMMIT_METADATA_SER_DE);
- assertEquals(expectAvgSize, actualAvgSize);
- }
-
- @Test
- public void testAverageBytesPerRecordForEmptyCommitTimeLine() throws
Exception {
- HoodieTimeline commitTimeLine = mock(HoodieTimeline.class);
- HoodieWriteConfig config = makeHoodieClientConfigBuilder().build();
- when(commitTimeLine.empty()).thenReturn(true);
- long expectAvgSize = config.getCopyOnWriteRecordSizeEstimate();
- long actualAvgSize =
AverageRecordSizeUtils.averageBytesPerRecord(commitTimeLine, config,
COMMIT_METADATA_SER_DE);
- assertEquals(expectAvgSize, actualAvgSize);
- }
-
@Test
public void testUpsertPartitioner() throws Exception {
final String testPartitionPath = "2016/09/26";
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
new file mode 100644
index 00000000000..de2937008d6
--- /dev/null
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/timeline/versioning/DefaultInstantComparator.java
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.timeline.versioning;
+
+import org.apache.hudi.common.table.timeline.versioning.v2.InstantComparatorV2;
+
+public class DefaultInstantComparator extends InstantComparatorV2 {
+}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
index 7f3e6f0ef7e..91a90dd5ed9 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestUtils.java
@@ -30,11 +30,13 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.HoodieTableVersion;
import org.apache.hudi.common.table.timeline.CommitMetadataSerDe;
import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.InstantComparator;
import org.apache.hudi.common.table.timeline.InstantFileNameGenerator;
import org.apache.hudi.common.table.timeline.InstantFileNameParser;
import org.apache.hudi.common.table.timeline.InstantGenerator;
import org.apache.hudi.common.table.timeline.TimelineFactory;
import
org.apache.hudi.common.table.timeline.versioning.DefaultCommitMetadataSerDe;
+import
org.apache.hudi.common.table.timeline.versioning.DefaultInstantComparator;
import
org.apache.hudi.common.table.timeline.versioning.DefaultInstantFileNameGenerator;
import
org.apache.hudi.common.table.timeline.versioning.DefaultInstantFileNameParser;
import
org.apache.hudi.common.table.timeline.versioning.DefaultInstantGenerator;
@@ -92,6 +94,7 @@ public class HoodieTestUtils {
public static final InstantFileNameGenerator INSTANT_FILE_NAME_GENERATOR =
new DefaultInstantFileNameGenerator();
public static final InstantFileNameParser INSTANT_FILE_NAME_PARSER = new
DefaultInstantFileNameParser();
public static final CommitMetadataSerDe COMMIT_METADATA_SER_DE = new
DefaultCommitMetadataSerDe();
+ public static final InstantComparator INSTANT_COMPARATOR = new
DefaultInstantComparator();
public static StorageConfiguration<Configuration> getDefaultStorageConf() {
return (StorageConfiguration<Configuration>)
ReflectionUtils.loadClass(HADOOP_STORAGE_CONF,