This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e713cf95fd [HUDI-8635] Support numWrites metric for compaction
(#13047)
5e713cf95fd is described below
commit 5e713cf95fda451bfec21a9c7ac68d2191d10ae6
Author: Lin Liu <[email protected]>
AuthorDate: Sat Mar 29 04:18:25 2025 -0700
[HUDI-8635] Support numWrites metric for compaction (#13047)
* Support totalLogRecord metric
* Add test for the compaction stats
* Address comments
* Address all comments
* Address comments
* Fix a failed test
* Add support for numWrites
* Address the comments
* Fixing inserts with log files
---------
Co-authored-by: sivabalan <[email protected]>
---
.../table/log/BaseHoodieLogRecordReader.java | 1 +
.../common/table/read/FileGroupRecordBuffer.java | 10 ++++++++++
.../table/read/HoodieFileGroupRecordBuffer.java | 5 +++++
.../read/PositionBasedFileGroupRecordBuffer.java | 18 +++++++++++++++---
.../table/action/compact/TestHoodieCompactor.java | 22 +++++++++++++++++++---
5 files changed, 50 insertions(+), 6 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
index b74a0c9b92f..1586e3f5ccc 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/BaseHoodieLogRecordReader.java
@@ -392,6 +392,7 @@ public abstract class BaseHoodieLogRecordReader<T> {
// Done
progress = 1.0f;
+ totalLogRecords.set(recordBuffer.getTotalLogRecords());
} catch (IOException e) {
LOG.error("Got IOException when reading log file", e);
throw new HoodieIOException("IOException when reading log file ", e);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
index 38ac956e07b..15c22883716 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/FileGroupRecordBuffer.java
@@ -99,6 +99,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
protected boolean enablePartialMerging = false;
protected InternalSchema internalSchema;
protected HoodieTableMetaClient hoodieTableMetaClient;
+ private long totalLogRecords = 0;
protected FileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
HoodieTableMetaClient hoodieTableMetaClient,
@@ -215,6 +216,10 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
return records.size();
}
+ public long getTotalLogRecords() {
+ return totalLogRecords;
+ }
+
@Override
public Iterator<Pair<Option<T>, Map<String, Object>>> getLogRecordIterator()
{
return records.values().iterator();
@@ -238,6 +243,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
Map<String, Object> metadata,
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair)
throws IOException {
+ totalLogRecords++;
if (existingRecordMetadataPair != null) {
if (enablePartialMerging) {
// TODO(HUDI-7843): decouple the merging logic from the merger
@@ -341,6 +347,7 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
*/
protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord
deleteRecord,
Pair<Option<T>,
Map<String, Object>> existingRecordMetadataPair) {
+ totalLogRecords++;
if (existingRecordMetadataPair != null) {
switch (recordMergeMode) {
case COMMIT_TIME_ORDERING:
@@ -572,7 +579,10 @@ public abstract class FileGroupRecordBuffer<T> implements
HoodieFileGroupRecordB
nextRecordInfo.getLeft(), nextRecordInfo.getRight());
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
+ readStats.incrementNumInserts();
return true;
+ } else {
+ readStats.incrementNumDeletes();
}
}
return false;
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
index d9ba8bcd90e..a7b9423be2e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/HoodieFileGroupRecordBuffer.java
@@ -90,6 +90,11 @@ public interface HoodieFileGroupRecordBuffer<T> {
*/
int size();
+ /**
+ * @return the total number of log records processed.
+ */
+ long getTotalLogRecords();
+
/**
* @return An iterator on the log records.
*/
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
index b0e039a4b68..bd450b77d31 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/read/PositionBasedFileGroupRecordBuffer.java
@@ -246,9 +246,21 @@ public class PositionBasedFileGroupRecordBuffer<T> extends
KeyBasedFileGroupReco
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);
- Option<T> resultRecord = logRecordInfo != null
- ? merge(Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight())
- : merge(Option.empty(), Collections.emptyMap(), Option.of(baseRecord),
metadata);
+ Option<T> resultRecord = Option.empty();
+ if (logRecordInfo != null) {
+ resultRecord = merge(
+ Option.of(baseRecord), metadata, logRecordInfo.getLeft(),
logRecordInfo.getRight());
+ if (resultRecord.isPresent()) {
+ nextRecord = readerContext.seal(resultRecord.get());
+ readStats.incrementNumUpdates();
+ } else {
+ readStats.incrementNumDeletes();
+ }
+ } else {
+ resultRecord = merge(Option.empty(), Collections.emptyMap(),
Option.of(baseRecord), metadata);
+ readStats.incrementNumInserts();
+ }
+
if (resultRecord.isPresent()) {
nextRecord = readerContext.seal(resultRecord.get());
return true;
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
index ceefcdf4924..ae05f28e8f3 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
+++
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/table/action/compact/TestHoodieCompactor.java
@@ -247,7 +247,7 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
assertLogFilesNumEqualsTo(config, i);
}
HoodieWriteMetadata result = compact(writeClient, String.format("10%s",
i));
- verifyCompaction(result);
+ verifyCompaction(result, 4000L);
// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(1,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
@@ -282,7 +282,7 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
assertLogFilesNumEqualsTo(config, 1);
HoodieWriteMetadata result = compact(writeClient,
writeClient.createNewInstantTime());
- verifyCompaction(result);
+ verifyCompaction(result, 100L);
// Verify compaction.requested, compaction.completed metrics counts.
assertEquals(i / 2 + 1,
getCompactionMetricCount(HoodieTimeline.REQUESTED_COMPACTION_SUFFIX));
@@ -470,13 +470,14 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
/**
* Verify that all partition paths are present in the HoodieWriteMetadata
result.
*/
- private void verifyCompaction(HoodieWriteMetadata compactionMetadata) {
+ private void verifyCompaction(HoodieWriteMetadata compactionMetadata, long
expectedTotalLogRecords) {
assertTrue(compactionMetadata.getWriteStats().isPresent());
List<HoodieWriteStat> stats = (List<HoodieWriteStat>)
compactionMetadata.getWriteStats().get();
assertEquals(dataGen.getPartitionPaths().length, stats.size());
for (String partitionPath : dataGen.getPartitionPaths()) {
assertTrue(stats.stream().anyMatch(stat ->
stat.getPartitionPath().contentEquals(partitionPath)));
}
+
stats.forEach(stat -> {
HoodieWriteStat.RuntimeStats runtimeStats = stat.getRuntimeStats();
assertNotNull(runtimeStats);
@@ -484,5 +485,20 @@ public class TestHoodieCompactor extends
HoodieSparkClientTestHarness {
assertTrue(runtimeStats.getTotalUpsertTime() > 0);
assertTrue(runtimeStats.getTotalScanTime() > 0);
});
+
+ // Verify the number of log records processed during the compaction.
+ long actualTotalLogRecords =
+ stats.stream().mapToLong(HoodieWriteStat::getTotalLogRecords).sum();
+ assertEquals(expectedTotalLogRecords, actualTotalLogRecords);
+
+ // Verify the number of records written during compaction.
+ long actualNumWritten =
+ stats.stream().mapToLong(HoodieWriteStat::getNumWrites).sum();
+ long actualNumUpdates =
+ stats.stream().mapToLong(HoodieWriteStat::getNumUpdateWrites).sum();
+ long actualInserts =
+ stats.stream().mapToLong(HoodieWriteStat::getNumInserts).sum();
+ assertTrue(actualNumWritten > 0
+ && actualNumWritten == actualNumUpdates + actualInserts);
}
}