wzx140 commented on code in PR #5629:
URL: https://github.com/apache/hudi/pull/5629#discussion_r909635425
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/HoodieCompactor.java:
##########
@@ -194,6 +194,8 @@ public List<WriteStatus> compact(HoodieCompactionHandler
compactionHandler,
.withBitCaskDiskMapCompressionEnabled(config.getCommonConfig().isBitCaskDiskMapCompressionEnabled())
.withOperationField(config.allowOperationMetadataField())
.withPartition(operation.getPartitionPath())
+ .withRecordType(config.getRecordType())
+ .withCombiningEngineClassFQN(config.getMergeClass())
Review Comment:
Fixed
##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java:
##########
@@ -140,29 +136,31 @@ public abstract class AbstractHoodieLogRecordReader {
private Option<String> partitionName;
// Populate meta fields for the records
private boolean populateMetaFields = true;
+ // Record type read from log block
+ protected final HoodieRecordType recordType;
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths,
Schema readerSchema,
String latestInstantTime, boolean
readBlocksLazily, boolean reverseReader,
int bufferSize, Option<InstantRange>
instantRange,
- boolean withOperationField) {
+ boolean withOperationField,
HoodieRecordType recordType, String combiningEngineClassFQN) {
this(fs, basePath, logFilePaths, readerSchema, latestInstantTime,
readBlocksLazily, reverseReader, bufferSize,
- instantRange, withOperationField, true, Option.empty(),
InternalSchema.getEmptyInternalSchema());
+ instantRange, withOperationField, true, Option.empty(),
InternalSchema.getEmptyInternalSchema(), recordType, combiningEngineClassFQN);
}
protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath,
List<String> logFilePaths,
Schema readerSchema, String
latestInstantTime, boolean readBlocksLazily,
boolean reverseReader, int
bufferSize, Option<InstantRange> instantRange,
boolean withOperationField, boolean
forceFullScan,
- Option<String> partitionName,
InternalSchema internalSchema) {
+ Option<String> partitionName,
InternalSchema internalSchema, HoodieRecordType recordType, String
combiningEngineClassFQN) {
this.readerSchema = readerSchema;
this.latestInstantTime = latestInstantTime;
this.hoodieTableMetaClient =
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
// load class from the payload fully qualified class name
HoodieTableConfig tableConfig =
this.hoodieTableMetaClient.getTableConfig();
this.payloadClassFQN = tableConfig.getPayloadClass();
this.preCombineField = tableConfig.getPreCombineField();
- this.mergeClassFQN = tableConfig.getMergeClass();
+ this.mergeClassFQN = combiningEngineClassFQN;
Review Comment:
Fixed
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]