vinothchandar commented on code in PR #12206:
URL: https://github.com/apache/hudi/pull/12206#discussion_r1839328071


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -154,46 +155,74 @@ public HoodieAppendHandle(HoodieWriteConfig config, 
String instantTime, HoodieTa
     this.sizeEstimator = new DefaultSizeEstimator();
     this.statuses = new ArrayList<>();
     this.recordProperties.putAll(config.getProps());
-    this.shouldWriteRecordPositions = config.shouldWriteRecordPositions();
+    this.shouldWriteRecordPositions = config.shouldWriteRecordPositions()
+        // record positions supported only from table version 8
+        && 
config.getWriteVersion().greaterThanOrEquals(HoodieTableVersion.EIGHT);
   }
 
   public HoodieAppendHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T, I, K, O> hoodieTable,
                             String partitionPath, String fileId, 
TaskContextSupplier sparkTaskContextSupplier) {
     this(config, instantTime, hoodieTable, partitionPath, fileId, null, 
sparkTaskContextSupplier);
   }
 
-  private void init(HoodieRecord record) {
-    if (!doInit) {
-      return;
-    }
-
-    String prevCommit = instantTime;
+  private void populateWriteStat(HoodieRecord record, HoodieDeltaWriteStat 
deltaWriteStat) {
+    HoodieTableVersion tableVersion = hoodieTable.version();
+    String prevCommit;
     String baseFile = "";
     List<String> logFiles = new ArrayList<>();
-    if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
-      // the cdc reader needs the base file metadata to have deterministic 
update sequence.
+
+    if (tableVersion.greaterThanOrEquals(HoodieTableVersion.EIGHT)) {
+      // table versions 8 and greater.
+      prevCommit = instantTime;
+      if (hoodieTable.getMetaClient().getTableConfig().isCDCEnabled()) {
+        // the cdc reader needs the base file metadata to have deterministic 
update sequence.
+        TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+        Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
+        if (fileSlice.isPresent()) {
+          prevCommit = fileSlice.get().getBaseInstantTime();
+          baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+          logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
+        }
+      }
+    } else {
+      // older table versions.
       TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
       Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
       if (fileSlice.isPresent()) {
         prevCommit = fileSlice.get().getBaseInstantTime();
         baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
         logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
+      } else {
+        // Set the base commit time as the current instantTime for new inserts 
into log files
+        prevCommit = instantTime;
+        // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
+        // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute
+        if (record.getCurrentLocation() != null && 
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
 {

Review Comment:
   @danny0405 can you check if you are ok. renamed and pushed a change



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to