danny0405 commented on code in PR #9776:
URL: https://github.com/apache/hudi/pull/9776#discussion_r1349441335


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -164,66 +163,68 @@ public HoodieAppendHandle(HoodieWriteConfig config, 
String instantTime, HoodieTa
 
   private void init(HoodieRecord record) {
     if (doInit) {
-      // extract some information from the first record
-      SliceView rtView = hoodieTable.getSliceView();
-      Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
-      // Set the base commit time as the current instantTime for new inserts 
into log files
-      String baseInstantTime;
+      String prevCommit = instantTime;
       String baseFile = "";
       List<String> logFiles = new ArrayList<>();
-      if (fileSlice.isPresent()) {
-        baseInstantTime = fileSlice.get().getBaseInstantTime();
-        baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
-        logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
-      } else {
-        baseInstantTime = instantTime;
-        // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
-        // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute
-        if (record.getCurrentLocation() != null && 
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
 {
-          baseInstantTime = record.getCurrentLocation().getInstantTime();
+      if (config.isCDCEnabled()) {
+        // the cdc reader needs the base file metadata to have deterministic 
update sequence.
+        TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+        Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
+        if (fileSlice.isPresent()) {
+          prevCommit = fileSlice.get().getBaseInstantTime();
+          baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+          logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
         }
-        // This means there is no base data file, start appending to a new log 
file
-        fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, 
this.fileId));
-        LOG.info("New AppendHandle for partition :" + partitionPath);
       }
 
       // Prepare the first write status
-      writeStatus.setStat(new HoodieDeltaWriteStat());
+      HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
+      writeStatus.setStat(deltaWriteStat);
       writeStatus.setFileId(fileId);
       writeStatus.setPartitionPath(partitionPath);
       averageRecordSize = sizeEstimator.sizeEstimate(record);
 
-      HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) 
writeStatus.getStat();
-      deltaWriteStat.setPrevCommit(baseInstantTime);
+      deltaWriteStat.setPrevCommit(prevCommit);
       deltaWriteStat.setPartitionPath(partitionPath);
       deltaWriteStat.setFileId(fileId);
       deltaWriteStat.setBaseFile(baseFile);
       deltaWriteStat.setLogFiles(logFiles);
 
       try {
         // Save hoodie partition meta in the partition path
-        HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, baseInstantTime,
+        HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
             new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
             hoodieTable.getPartitionMetafileFormat());
         partitionMetadata.trySave(getPartitionId());
 
-        // Since the actual log file written to can be different based on when 
rollover happens, we use the
-        // base file to denote some log appends happened on a slice. 
writeToken will still fence concurrent
-        // writers.
-        // https://issues.apache.org/jira/browse/HUDI-1517
-        createMarkerFile(partitionPath, 
FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, 
hoodieTable.getBaseFileExtension()));
-
-        this.writer = createLogWriter(fileSlice, baseInstantTime);
+        this.writer = createLogWriter(getFileInstant(record));
       } catch (Exception e) {
         LOG.error("Error in update task at commit " + instantTime, e);
         writeStatus.setGlobalError(e);
         throw new HoodieUpsertException("Failed to initialize 
HoodieAppendHandle for FileId: " + fileId + " on commit "
-            + instantTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
+            + instantTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
       }
       doInit = false;
     }
   }
 
+  /**
+   * Returns the instant time to use in the log file name.
+   */
+  private String getFileInstant(HoodieRecord<?> record) {
+    if (config.isConsistentHashingEnabled()) {
+      // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
+      // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute

Review Comment:
   I just keep the logic compatible with before, but I agree with you we should 
have a better abstraction for the index/engine specific logic, in following-up 
PRs: https://issues.apache.org/jira/browse/HUDI-6920



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to