danny0405 commented on code in PR #13699:
URL: https://github.com/apache/hudi/pull/13699#discussion_r2280067070
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/FileGroupReaderBasedMergeHandle.java:
##########
@@ -156,40 +215,53 @@ private void init(CompactionOperation operation, String
partitionPath) {
fileWriter = HoodieFileWriterFactory.getFileWriter(instantTime,
newFilePath, hoodieTable.getStorage(),
config, writeSchemaWithMetaFields, taskContextSupplier, recordType);
} catch (IOException io) {
- LOG.error("Error in update task at commit {}", instantTime, io);
writeStatus.setGlobalError(io);
throw new HoodieUpsertException("Failed to initialize HoodieUpdateHandle
for FileId: " + fileId + " on commit "
+ instantTime + " on path " +
hoodieTable.getMetaClient().getBasePath(), io);
}
}
+ @Override
+ protected void populateIncomingRecordsMap(Iterator<HoodieRecord<T>>
newRecordsItr) {
+ // no op.
+ }
+
+ /**
+ * This is only for spark, the engine context fetched from a serialized
hoodie table is always local,
+ * overrides it to spark specific reader context.
+ */
+ public void setReaderContext(HoodieReaderContext<T> readerContext) {
+ this.readerContext = readerContext;
+ }
+
/**
* Reads the file slice of a compaction operation using a file group reader,
* by getting an iterator of the records; then writes the records to a new
base file.
*/
@Override
public void doMerge() {
+ // For non-compaction operations, the merger needs to be initialized with
the writer properties to handle cases like Merge-Into commands
+ if (operation.isEmpty()) {
+ this.readerContext.initRecordMergerForIngestion(config.getProps());
+ }
boolean usePosition =
config.getBooleanOrDefault(MERGE_USE_RECORD_POSITIONS);
- Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema());
- TypedProperties props = TypedProperties.copy(config.getProps());
+ Option<InternalSchema> internalSchemaOption =
SerDeHelper.fromJson(config.getInternalSchema())
+ .map(internalSchema ->
AvroSchemaEvolutionUtils.reconcileSchema(writeSchemaWithMetaFields,
internalSchema,
+
config.getBooleanOrDefault(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS)));
long maxMemoryPerCompaction =
IOUtils.getMaxMemoryPerCompaction(taskContextSupplier, config);
props.put(HoodieMemoryConfig.MAX_MEMORY_FOR_MERGE.key(),
String.valueOf(maxMemoryPerCompaction));
- Stream<HoodieLogFile> logFiles =
operation.getDeltaFileNames().stream().map(logFileName ->
+ Option<Stream<HoodieLogFile>> logFilesStreamOpt = operation.map(op ->
op.getDeltaFileNames().stream().map(logFileName ->
new HoodieLogFile(new StoragePath(FSUtils.constructAbsolutePath(
- config.getBasePath(), operation.getPartitionPath()),
logFileName)));
+ config.getBasePath(), op.getPartitionPath()), logFileName))));
// Initializes file group reader
- try (HoodieFileGroupReader<T> fileGroupReader =
HoodieFileGroupReader.<T>newBuilder().withReaderContext(readerContext).withHoodieTableMetaClient(hoodieTable.getMetaClient())
-
.withLatestCommitTime(maxInstantTime).withPartitionPath(partitionPath).withBaseFileOption(Option.ofNullable(baseFileToMerge)).withLogFiles(logFiles)
-
.withDataSchema(writeSchemaWithMetaFields).withRequestedSchema(writeSchemaWithMetaFields).withInternalSchema(internalSchemaOption).withProps(props)
-
.withShouldUseRecordPosition(usePosition).withSortOutput(hoodieTable.requireSortedRecords())
- .withFileGroupUpdateCallback(cdcLogger.map(logger -> new
CDCCallback(logger,
readerContext))).withEnableOptimizedLogBlockScan(config.enableOptimizedLogBlocksScan()).build())
{
+ try (HoodieFileGroupReader<T> fileGroupReader =
getFileGroupReader(usePosition, internalSchemaOption, props, logFilesStreamOpt,
incomingRecordsItr)) {
// Reads the records from the file slice
try (ClosableIterator<HoodieRecord<T>> recordIterator =
fileGroupReader.getClosableHoodieRecordIterator()) {
while (recordIterator.hasNext()) {
HoodieRecord<T> record = recordIterator.next();
+ Option<Map<String, String>> recordMetadata =
getRecordMetadata(record, writeSchema, props);
Review Comment:
we should also fix `HoodieWriteHandle` to keep the behavior alligned: skip
the event time tracing when `preserveMetadata` is true.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]