This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 6f57bbf [HUDI-3069] Improve HoodieMergedLogRecordScanner avoid
putting unnecessary hoodie records (#4932)
6f57bbf is described below
commit 6f57bbfac4881388d73a9e0768498fe47fe6bb5a
Author: 苏承祥 <[email protected]>
AuthorDate: Mon Mar 7 14:35:55 2022 +0800
[HUDI-3069] Improve HoodieMergedLogRecordScanner avoid putting unnecessary
hoodie records (#4932)
* log scanner optimization
* payload equals switches to `=`
Co-authored-by: 苏承祥 <[email protected]>
---
.../common/table/log/HoodieMergedLogRecordScanner.java | 17 ++++++++---------
1 file changed, 8 insertions(+), 9 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index d0ab73a..882e105 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -59,18 +59,15 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
private static final Logger LOG =
LogManager.getLogger(HoodieMergedLogRecordScanner.class);
-
+ // A timer for calculating elapsed time in millis
+ public final HoodieTimer timer = new HoodieTimer();
// Final map of compacted/merged records
protected final ExternalSpillableMap<String, HoodieRecord<? extends
HoodieRecordPayload>> records;
-
// count of merged records in log
private long numMergedRecordsInLog;
private long maxMemorySizeInBytes;
-
// Stores the total time taken to perform reading and merging of log blocks
private long totalTimeTakenToReadAndMergeBlocks;
- // A timer for calculating elapsed time in millis
- public final HoodieTimer timer = new HoodieTimer();
@SuppressWarnings("unchecked")
protected HoodieMergedLogRecordScanner(FileSystem fs, String basePath,
List<String> logFilePaths, Schema readerSchema,
@@ -143,9 +140,11 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
HoodieRecord<? extends HoodieRecordPayload> oldRecord = records.get(key);
HoodieRecordPayload oldValue = oldRecord.getData();
HoodieRecordPayload combinedValue =
hoodieRecord.getData().preCombine(oldValue);
- boolean choosePrev = combinedValue.equals(oldValue);
- HoodieOperation operation = choosePrev ? oldRecord.getOperation() :
hoodieRecord.getOperation();
- records.put(key, new HoodieAvroRecord<>(new HoodieKey(key,
hoodieRecord.getPartitionPath()), combinedValue, operation));
+ // If combinedValue is oldValue, no need rePut oldRecord
+ if (combinedValue != oldValue) {
+ HoodieOperation operation = hoodieRecord.getOperation();
+ records.put(key, new HoodieAvroRecord<>(new HoodieKey(key,
hoodieRecord.getPartitionPath()), combinedValue, operation));
+ }
} else {
// Put the record as is
records.put(key, hoodieRecord);
@@ -187,11 +186,11 @@ public class HoodieMergedLogRecordScanner extends
AbstractHoodieLogRecordReader
protected boolean isBitCaskDiskMapCompressionEnabled =
HoodieCommonConfig.DISK_MAP_BITCASK_COMPRESSION_ENABLED.defaultValue();
// incremental filtering
protected Option<InstantRange> instantRange = Option.empty();
+ protected String partitionName;
// auto scan default true
private boolean autoScan = true;
// operation field default false
private boolean withOperationField = false;
- protected String partitionName;
@Override
public Builder withFileSystem(FileSystem fs) {