codope commented on code in PR #9776:
URL: https://github.com/apache/hudi/pull/9776#discussion_r1348240543


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/HoodieTable.java:
##########
@@ -726,19 +727,24 @@ protected void 
reconcileAgainstMarkers(HoodieEngineContext context,
         return;
       }
 
-      // we are not including log appends here, since they are already 
fail-safe.
-      Set<String> invalidDataPaths = getInvalidDataPaths(markers);
-      Set<String> validDataPaths = stats.stream()
+      // Ignores log file appended for update, since they are already 
fail-safe.
+      // but new created log files should be included.
+      Set<String> invalidFilePaths = getInvalidDataPaths(markers);

Review Comment:
   let's avoid rename? the method name is still `getInvalidDataPaths`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -951,12 +951,6 @@ private void startCommit(String instantTime, String 
actionType, HoodieTableMetaC
             + "table could be in an inconsistent state. Pending restores: " + 
Arrays.toString(inflightRestoreTimeline.getInstantsAsStream()
             .map(instant -> 
instant.getTimestamp()).collect(Collectors.toList()).toArray()));
 
-    // if there are pending compactions, their instantTime must not be greater 
than that of this instant time
-    
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
 ->

Review Comment:
   Let's take this scenario where I have a pipeline with single writer W1 but 
I'm also running an offline compactor (separate job) W2:
   At t=10,   10.deltacommit completed by W1 
   At t=11,    11.deltacommit requested by W1 (as in just generated new instant 
time but the commit has not gone inflight).
   At t=12,    12.compaction.requested by W2 and goes inflight at 12.5 let's say
   At t=13,     11.deltacommit.inflight
   
   This validation used to guard against such scenarios. But, now we're saying 
that it's ok to let both the compaction and deltacommit proceed. The compaction 
plan will proceed as usual with the latest fileslices at t=10. 11.deltacommit 
will create new slice, which will be missed by compactor. However, the 
filesystem view API based on new slicing algorithm will ensure that thw new 
file slice is considered after the barrier created by the compaction base file. 



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -164,66 +163,68 @@ public HoodieAppendHandle(HoodieWriteConfig config, 
String instantTime, HoodieTa
 
   private void init(HoodieRecord record) {
     if (doInit) {
-      // extract some information from the first record
-      SliceView rtView = hoodieTable.getSliceView();
-      Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
-      // Set the base commit time as the current instantTime for new inserts 
into log files
-      String baseInstantTime;
+      String prevCommit = instantTime;
       String baseFile = "";
       List<String> logFiles = new ArrayList<>();
-      if (fileSlice.isPresent()) {
-        baseInstantTime = fileSlice.get().getBaseInstantTime();
-        baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
-        logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
-      } else {
-        baseInstantTime = instantTime;
-        // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
-        // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute
-        if (record.getCurrentLocation() != null && 
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
 {
-          baseInstantTime = record.getCurrentLocation().getInstantTime();
+      if (config.isCDCEnabled()) {

Review Comment:
   we need to have a test covering cdc flow.



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,27 @@ public static Option<Pair<Integer, String>> 
getLatestLogVersion(FileSystem fs, P
   }
 
   /**
-   * computes the next log version for the specified fileId in the partition 
path.
+   * Returns whether the given path exists on the filesystem or false if any 
exception occurs.
    */
-  public static int computeNextLogVersion(FileSystem fs, Path partitionPath, 
final String fileId,
-      final String logFileExtension, final String baseCommitTime) throws 
IOException {
-    Option<Pair<Integer, String>> currentVersionWithWriteToken =
-        getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, 
baseCommitTime);
-    // handle potential overflow
-    return (currentVersionWithWriteToken.isPresent()) ? 
currentVersionWithWriteToken.get().getKey() + 1
-        : HoodieLogFile.LOGFILE_BASE_VERSION;
+  private static boolean fileExists(FileSystem fs, Path path) {

Review Comment:
   why do we need it separately? Are we doing more fs.exists call than what we 
already have?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -69,6 +84,64 @@ public CompletionTimeQueryView(HoodieTableMetaClient 
metaClient, String startIns
     load();
   }
 
+  /**
+   * Returns whether the instant is completed.
+   */
+  public boolean isCompleted(String instantTime) {
+    return getCompletionTime(instantTime).isPresent();
+  }
+
+  /**
+   * Returns whether the give instant time {@code instantTime} completed 
before the base instant {@code baseInstant}.
+   */
+  public boolean isCompletedBefore(String baseInstant, String instantTime) {
+    Option<String> completionTimeOpt = getCompletionTime(baseInstant, 
instantTime);
+    if (completionTimeOpt.isPresent()) {
+      return HoodieTimeline.compareTimestamps(completionTimeOpt.get(), 
LESSER_THAN, baseInstant);
+    }
+    return false;
+  }
+
+  /**
+   * Returns whether the give instant time {@code instantTime} is sliced after 
or on the base instant {@code baseInstant}.
+   */
+  public boolean isSlicedAfterOrOn(String baseInstant, String instantTime) {
+    Option<String> completionTimeOpt = getCompletionTime(baseInstant, 
instantTime);
+    if (completionTimeOpt.isPresent()) {
+      return HoodieTimeline.compareTimestamps(completionTimeOpt.get(), 
GREATER_THAN_OR_EQUALS, baseInstant);
+    }
+    return true;
+  }
+
+  /**
+   * Get completion time with a base instant time as a reference to fix the 
compatibility.
+   *
+   * @param baseInstant The base instant
+   * @param instantTime The instant time to query the completion time with
+   *
+   * @return Probability fixed completion time.
+   */
+  public Option<String> getCompletionTime(String baseInstant, String 
instantTime) {

Review Comment:
   Need to cover these public methods in unit tests. I did see a test in 
`TestHoodieFileGroup#testGetBaseInstantTime` with mocked 
CompletionTimeQueryView. But, can we test the negative overlapping writes 
scenarios and ensure we're getting view in correct order?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -138,6 +142,20 @@ protected void refreshTimeline(HoodieTimeline 
visibleActiveTimeline) {
     this.visibleCommitsAndCompactionTimeline = 
visibleActiveTimeline.getWriteTimeline();
   }
 
+  /**
+   * Refresh the completion time query view.
+   */
+  protected void refreshCompletionTimeQueryView() {
+    this.completionTimeQueryView = new CompletionTimeQueryView(metaClient);

Review Comment:
   this is going to run full reload including archived (slim) timeline. Does it 
need to happen more than once in the lifecyle of a query or batch write? I see 
that it can be called by HoodieMetadataFileSystemView as well. We need to be a 
bit careful here.



##########
hudi-common/src/main/java/org/apache/hudi/common/model/HoodieFileGroup.java:
##########
@@ -101,12 +103,36 @@ public void addBaseFile(HoodieBaseFile dataFile) {
 
   /**
    * Add a new log file into the group.
+   *
+   * <p>CAUTION: the log file must be added in sequence of the delta commit 
time.

Review Comment:
   Filesystem view calls this while building filegroups but that call is in 
sycnhronized block. However, I am assuming we have a multi writer and 
concurrent reader test to validate. Or we can make this visible for testing and 
start two threads to add log files and validate.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java:
##########
@@ -198,6 +171,26 @@ public List<HoodieRollbackRequest> 
getRollbackRequests(HoodieInstant instantToRo
     }
   }
 
+  private FileStatus[] listAllFilesBelongsToOrNewerThanVersion(

Review Comment:
   rename to `listAllFilesSinceCommit`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -233,41 +234,45 @@ private static Schema getWriteSchema(HoodieWriteConfig 
config) {
     return new Schema.Parser().parse(config.getWriteSchema());
   }
 
-  protected HoodieLogFormat.Writer createLogWriter(
-      Option<FileSlice> fileSlice, String baseCommitTime) throws IOException {
-    return createLogWriter(fileSlice, baseCommitTime, null);
+  protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime) {
+    return createLogWriter(deltaCommitTime, null);
   }
 
-  protected HoodieLogFormat.Writer createLogWriter(
-      Option<FileSlice> fileSlice, String baseCommitTime, String suffix) 
throws IOException {
-    Option<HoodieLogFile> latestLogFile = fileSlice.isPresent()
-        ? fileSlice.get().getLatestLogFile()
-        : Option.empty();
-
-    return HoodieLogFormat.newWriterBuilder()
-        
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
-        .withFileId(fileId)
-        .overBaseCommit(baseCommitTime)
-        
.withLogVersion(latestLogFile.map(HoodieLogFile::getLogVersion).orElse(HoodieLogFile.LOGFILE_BASE_VERSION))
-        .withFileSize(latestLogFile.map(HoodieLogFile::getFileSize).orElse(0L))
-        .withSizeThreshold(config.getLogFileMaxSize())
-        .withFs(fs)
-        .withRolloverLogWriteToken(writeToken)
-        
.withLogWriteToken(latestLogFile.map(HoodieLogFile::getLogWriteToken).orElse(writeToken))
-        .withSuffix(suffix)
-        .withFileExtension(HoodieLogFile.DELTA_EXTENSION).build();
-  }
-
-  protected HoodieLogFormat.Writer createLogWriter(String baseCommitTime, 
String fileSuffix) {
+  protected HoodieLogFormat.Writer createLogWriter(String deltaCommitTime, 
String fileSuffix) {
     try {
-      return createLogWriter(Option.empty(),baseCommitTime, fileSuffix);
+      return HoodieLogFormat.newWriterBuilder()
+          
.onParentPath(FSUtils.getPartitionPath(hoodieTable.getMetaClient().getBasePath(),
 partitionPath))
+          .withFileId(fileId)
+          .withDeltaCommit(deltaCommitTime)
+          .withFileSize(0L)

Review Comment:
   Shouldn't this be `HoodieLogFile#getFileSize`?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieAppendHandle.java:
##########
@@ -164,66 +163,68 @@ public HoodieAppendHandle(HoodieWriteConfig config, 
String instantTime, HoodieTa
 
   private void init(HoodieRecord record) {
     if (doInit) {
-      // extract some information from the first record
-      SliceView rtView = hoodieTable.getSliceView();
-      Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
-      // Set the base commit time as the current instantTime for new inserts 
into log files
-      String baseInstantTime;
+      String prevCommit = instantTime;
       String baseFile = "";
       List<String> logFiles = new ArrayList<>();
-      if (fileSlice.isPresent()) {
-        baseInstantTime = fileSlice.get().getBaseInstantTime();
-        baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
-        logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
-      } else {
-        baseInstantTime = instantTime;
-        // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
-        // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute
-        if (record.getCurrentLocation() != null && 
HoodieInstantTimeGenerator.isValidInstantTime(record.getCurrentLocation().getInstantTime()))
 {
-          baseInstantTime = record.getCurrentLocation().getInstantTime();
+      if (config.isCDCEnabled()) {
+        // the cdc reader needs the base file metadata to have deterministic 
update sequence.
+        TableFileSystemView.SliceView rtView = hoodieTable.getSliceView();
+        Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
+        if (fileSlice.isPresent()) {
+          prevCommit = fileSlice.get().getBaseInstantTime();
+          baseFile = 
fileSlice.get().getBaseFile().map(BaseFile::getFileName).orElse("");
+          logFiles = 
fileSlice.get().getLogFiles().map(HoodieLogFile::getFileName).collect(Collectors.toList());
         }
-        // This means there is no base data file, start appending to a new log 
file
-        fileSlice = Option.of(new FileSlice(partitionPath, baseInstantTime, 
this.fileId));
-        LOG.info("New AppendHandle for partition :" + partitionPath);
       }
 
       // Prepare the first write status
-      writeStatus.setStat(new HoodieDeltaWriteStat());
+      HoodieDeltaWriteStat deltaWriteStat = new HoodieDeltaWriteStat();
+      writeStatus.setStat(deltaWriteStat);
       writeStatus.setFileId(fileId);
       writeStatus.setPartitionPath(partitionPath);
       averageRecordSize = sizeEstimator.sizeEstimate(record);
 
-      HoodieDeltaWriteStat deltaWriteStat = (HoodieDeltaWriteStat) 
writeStatus.getStat();
-      deltaWriteStat.setPrevCommit(baseInstantTime);
+      deltaWriteStat.setPrevCommit(prevCommit);
       deltaWriteStat.setPartitionPath(partitionPath);
       deltaWriteStat.setFileId(fileId);
       deltaWriteStat.setBaseFile(baseFile);
       deltaWriteStat.setLogFiles(logFiles);
 
       try {
         // Save hoodie partition meta in the partition path
-        HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, baseInstantTime,
+        HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
             new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath),
             hoodieTable.getPartitionMetafileFormat());
         partitionMetadata.trySave(getPartitionId());
 
-        // Since the actual log file written to can be different based on when 
rollover happens, we use the
-        // base file to denote some log appends happened on a slice. 
writeToken will still fence concurrent
-        // writers.
-        // https://issues.apache.org/jira/browse/HUDI-1517
-        createMarkerFile(partitionPath, 
FSUtils.makeBaseFileName(baseInstantTime, writeToken, fileId, 
hoodieTable.getBaseFileExtension()));
-
-        this.writer = createLogWriter(fileSlice, baseInstantTime);
+        this.writer = createLogWriter(getFileInstant(record));
       } catch (Exception e) {
         LOG.error("Error in update task at commit " + instantTime, e);
         writeStatus.setGlobalError(e);
         throw new HoodieUpsertException("Failed to initialize 
HoodieAppendHandle for FileId: " + fileId + " on commit "
-            + instantTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePath() + "/" + partitionPath, e);
+            + instantTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePathV2() + "/" + partitionPath, e);
       }
       doInit = false;
     }
   }
 
+  /**
+   * Returns the instant time to use in the log file name.
+   */
+  private String getFileInstant(HoodieRecord<?> record) {
+    if (config.isConsistentHashingEnabled()) {
+      // Handle log file only case. This is necessary for the concurrent 
clustering and writer case (e.g., consistent hashing bucket index).
+      // NOTE: flink engine use instantTime to mark operation type, check 
BaseFlinkCommitActionExecutor::execute

Review Comment:
   This is possible even when we use certain index types such that inserts can 
go directly to log files (e.g. in-memory index). But, in that case, we may not 
be using the instant time to mark the operation type. How to handle in such 
cases? 
   I think this logic is every engine-specific. Is it the right place to call? 
Can we not resolve the instant time a layer above?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatWriter.java:
##########
@@ -90,15 +101,25 @@ public void withOutputStream(FSDataOutputStream output) {
    */
   private FSDataOutputStream getOutputStream() throws IOException {
     if (this.output == null) {
-      Path path = logFile.getPath();
-      if (fs.exists(path)) {
-        rollOver();
-        createNewFile();
-        LOG.info("File {} already exists, rolling over to {}", path, 
logFile.getPath());
-      } else {
-        LOG.info(logFile + " does not exist. Create a new file");
-        // Block size does not matter as we will always manually autoflush
-        createNewFile();
+      boolean created = false;
+      while (!created) {
+        try {
+          // Block size does not matter as we will always manually autoflush
+          createNewFile();
+          LOG.info("Created a new log file: {}", logFile);
+          created = true;
+        } catch (FileAlreadyExistsException ignored) {
+          LOG.info("File {} already exists, rolling over", logFile.getPath());
+          rollOver();
+        } catch (RemoteException re) {
+          if 
(re.getClassName().contentEquals(AlreadyBeingCreatedException.class.getName())) 
{
+            LOG.warn("Another task executor writing to the same log file(" + 
logFile + ", rolling over");
+            // Rollover the current log file (since cannot get a stream 
handle) and create new one
+            rollOver();
+          } else {
+            throw re;

Review Comment:
   When will it come to ths else block? Also, please throw a HoodieException 
with some message.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/upgrade/TestUpgradeDowngrade.java:
##########
@@ -844,6 +847,47 @@ private Pair<List<HoodieRecord>, List<HoodieRecord>> 
twoUpsertCommitDataWithTwoP
     return Pair.of(records, records2);
   }
 
+  /**
+   * Since how markers are generated for log file changed in Version Six, we 
regenerate markers in the way version zero do.
+   *
+   * @param table instance of {@link HoodieTable}
+   */
+  private void prepForUpgradeFromZeroToOne(HoodieTable table) throws 
IOException {

Review Comment:
   Why do we need it here? Marker for log files PR should have alrady covered 
this test right? Is 0 to 1 upgrade flaky?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/compact/plan/generators/BaseHoodieCompactionPlanGenerator.java:
##########
@@ -118,8 +120,21 @@ public HoodieCompactionPlan generateCompactionPlan() 
throws IOException {
         .getLatestFileSlices(partitionPath)
         .filter(slice -> filterFileSlice(slice, lastCompletedInstantTime, 
fgIdsInPendingCompactionAndClustering, instantRange))
         .map(s -> {
-          List<HoodieLogFile> logFiles =
-              
s.getLogFiles().sorted(HoodieLogFile.getLogFileComparator()).collect(toList());
+          List<HoodieLogFile> logFiles = s.getLogFiles()
+              // ==============================================================
+              // IMPORTANT
+              // ==============================================================
+              // Currently, our filesystem view could return a file slice with 
pending log files there,
+              // these files should be excluded from the plan, let's say we 
have such a sequence of actions
+
+              // t10: a delta commit starts,
+              // t20: the compaction is scheduled and the t10 delta commit is 
still pending, and the "fg_10.log" is included in the plan
+              // t25: the delta commit 10 finishes,
+              // t30: the compaction execution starts, now the reader 
considers the log file "fg_10.log" as valid.
+
+              // for both OCC and NB-CC, this is in-correct.

Review Comment:
   Let's add unit test to cover this scenario.



##########
hudi-common/src/main/java/org/apache/hudi/common/table/timeline/CompletionTimeQueryView.java:
##########
@@ -16,27 +16,33 @@
  * limitations under the License.
  */
 
-package org.apache.hudi.client.timeline;
+package org.apache.hudi.common.table.timeline;
 
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieArchivedTimeline;
-import org.apache.hudi.common.table.timeline.HoodieInstant;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
 
 import org.apache.avro.generic.GenericRecord;
 
+import java.io.Serializable;
+import java.time.Instant;
+import java.util.Date;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
 import static 
org.apache.hudi.common.table.timeline.HoodieArchivedTimeline.COMPLETION_TIME_ARCHIVED_META_FIELD;
 import static org.apache.hudi.common.table.timeline.HoodieTimeline.EQUALS;
 import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
+import static 
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN_OR_EQUALS;
+import static org.apache.hudi.common.table.timeline.HoodieTimeline.LESSER_THAN;
 
 /**
  * Query view for instant completion time.
  */
-public class CompletionTimeQueryView implements AutoCloseable {
+public class CompletionTimeQueryView implements AutoCloseable, Serializable {
+  private static final long serialVersionUID = 1L;
+
+  private static final long MILLI_SECONDS_IN_THREE_DAYS = 3 * 24 * 3600 * 1000;

Review Comment:
   Do we need some internal config to control this?



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,27 @@ public static Option<Pair<Integer, String>> 
getLatestLogVersion(FileSystem fs, P
   }
 
   /**
-   * computes the next log version for the specified fileId in the partition 
path.
+   * Returns whether the given path exists on the filesystem or false if any 
exception occurs.
    */
-  public static int computeNextLogVersion(FileSystem fs, Path partitionPath, 
final String fileId,
-      final String logFileExtension, final String baseCommitTime) throws 
IOException {
-    Option<Pair<Integer, String>> currentVersionWithWriteToken =
-        getLatestLogVersion(fs, partitionPath, fileId, logFileExtension, 
baseCommitTime);
-    // handle potential overflow
-    return (currentVersionWithWriteToken.isPresent()) ? 
currentVersionWithWriteToken.get().getKey() + 1
-        : HoodieLogFile.LOGFILE_BASE_VERSION;
+  private static boolean fileExists(FileSystem fs, Path path) {
+    try {
+      return fs.exists(path);
+    } catch (Exception ignored) {
+      return false;
+    }
+  }
+
+  /**
+   * Computes the next log version for the specified fileId in the partition 
path.
+   */
+  public static int computeNextLogVersion(int curVersion, FileSystem fs, Path 
partitionPath, final String fileId,

Review Comment:
   not being used anywhere?



##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/timeline/TestCompletionTimeQueryView.java:
##########


Review Comment:
   let's move this test class to hudi-common as well? And also, add more tests 
for the new methods?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/view/AbstractTableFileSystemView.java:
##########
@@ -216,6 +233,9 @@ protected List<HoodieFileGroup> 
buildFileGroups(Stream<HoodieBaseFile> baseFileS
           group.addNewFileSliceAtInstant(pendingCompaction.get().getKey());
         }
       }
+      if (logFiles.containsKey(pair)) {
+        
logFiles.get(pair).stream().sorted(HoodieLogFile.getLogFileComparator()).forEach(logFile
 -> group.addLogFile(completionTimeQueryView, logFile));
+      }

Review Comment:
   ok i see, here it's being sorted so log files will be added in sequence of 
deltacommit time. As long as there is one caller it's fine. But, how do we that 
other callers don't abuse it? Should we then add a validation in 
filegroup.addLogFile?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/HoodieRecordUtils.java:
##########
@@ -109,4 +110,11 @@ public static boolean 
recordTypeCompatibleEngine(HoodieRecordType recordType, En
   public static HoodieRecordMerger mergerToPreCombineMode(HoodieRecordMerger 
merger) {
     return merger instanceof OperationModeAwareness ? 
((OperationModeAwareness) merger).asPreCombiningMode() : merger;
   }
+
+  public static String getCurrentLocationInstant(HoodieRecord<?> record) {
+    if (record.getCurrentLocation() != null) {
+      return record.getCurrentLocation().getInstantTime();
+    }
+    return null;

Review Comment:
   return an `Option`?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -516,10 +516,13 @@ public void 
testMultiWriterWithAsyncTableServicesWithConflict(HoodieTableType ta
       // We want the upsert to go through only after the compaction
       // and cleaning schedule completion. So, waiting on latch here.
       latchCountDownAndWait(scheduleCountDownLatch, 30000);
-      if (tableType == HoodieTableType.MERGE_ON_READ) {
-        // Since the compaction already went in, this upsert has
+      if (tableType == HoodieTableType.MERGE_ON_READ && !(resolutionStrategy 
instanceof PreferWriterConflictResolutionStrategy)) {
+        // HUDI-6897: Improve 
SimpleConcurrentFileWritesConflictResolutionStrategy for NB-CC

Review Comment:
   This should be immediate followup. Let's fix the constraints in resolution 
strategy.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/action/rollback/TestMergeOnReadRollbackActionExecutor.java:
##########
@@ -126,21 +126,19 @@ public void testMergeOnReadRollbackActionExecutor(boolean 
isUsingMarkers) throws
     for (Map.Entry<String, HoodieRollbackPartitionMetadata> entry : 
rollbackMetadata.entrySet()) {
       HoodieRollbackPartitionMetadata meta = entry.getValue();
       assertEquals(0, meta.getFailedDeleteFiles().size());
-      assertEquals(0, meta.getSuccessDeleteFiles().size());
+      assertEquals(1, meta.getSuccessDeleteFiles().size());

Review Comment:
   not following the logic here.. why should there be 1 successful delete at 
this stage?



##########
hudi-common/src/main/java/org/apache/hudi/common/fs/FSUtils.java:
##########
@@ -544,15 +543,40 @@ public static Option<Pair<Integer, String>> 
getLatestLogVersion(FileSystem fs, P
   }
 
   /**
-   * computes the next log version for the specified fileId in the partition 
path.
+   * Get the latest log version for the fileId in the partition path.
+   */
+  public static int getLatestLogVersion(FileSystem fs, Path partitionPath, 
final String fileId,
+                                                        final String 
logFileExtension, final String deltaCommitTime, final String writeToken) throws 
IOException {
+    int version = HoodieLogFile.LOGFILE_BASE_VERSION;
+    String logFileName = makeLogFileName(fileId, logFileExtension, 
deltaCommitTime, version, writeToken);
+    while (fileExists(fs, new Path(partitionPath, logFileName))) {

Review Comment:
   Yes, we should not have the need to do exists check in 
`HoodieLogFormatWriter.getOutputStream`. Go ahead with creating new log file, 
it it already exists (which will be known the file system exception thrown upon 
creation), just rollover. Let's remove all `fileExists` usages.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to