danny0405 commented on code in PR #12206:
URL: https://github.com/apache/hudi/pull/12206#discussion_r1830372556
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -154,46 +155,74 @@ public HoodieAppendHandle(HoodieWriteConfig config,
String instantTime, HoodieTa
this.sizeEstimator = new DefaultSizeEstimator();
this.statuses = new ArrayList<>();
this.recordProperties.putAll(config.getProps());
- this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
+ this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
+ // record positions supported only from table version 8
+ &&
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
}
public HoodieAppendHandle(HoodieWriteConfig config, String instantTime,
HoodieTable<T, I, K, O> hoodieTable,
String partitionPath, String fileId,
TaskContextSupplier sparkTaskContextSupplier) {
this(config, instantTime, hoodieTable, partitionPath, fileId, null,
sparkTaskContextSupplier);
}
- private void init(HoodieRecord record) {
- if (!doInit) {
- return;
- }
-
- String prevCommit = instantTime;
+ private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat
deltaWriteStat) {
+ HoodieTableVersion tableVersion = hoodieTable.version();
+ String prevCommit;
String baseFile = "";
List<String> logFiles = new ArrayList<>();
- if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
- // the cdc reader needs the base file metadata to have deterministic
update sequence.
+
+ if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+ // table versions 8 and greater.
+ prevCommit = instantTime;
+ if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+ // the cdc reader needs the base file metadata to have deterministic
update sequence.
+ TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+ Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
+ if (fileSlice.isPresent()) {
+ prevCommit = fileSlice.get().getBaseInstantTime();
+ baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+ logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
+ }
+ }
+ } else {
+ // older table versions.
TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath,
fileId);
if (fileSlice.isPresent()) {
prevCommit = fileSlice.get().getBaseInstantTime();
baseFile =
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
logFiles =
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
+ } else {
+ // Set the base commit time as the current instantTime for new inserts
into log files
+ prevCommit = instantTime;
+ // Handle log file only case. This is necessary for the concurrent
clustering and writer case (e.g., consistent hashing bucket index).
+ // NOTE: flink engine use instantTime to mark operation type, check
BaseFlinkCommitActionExecutor::execute
+ if (record.getCurrentLocation() != null &&
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
{
Review Comment:
It should be just `getFileInstant(record)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]