yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1023177121


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem 
fs) {
     metrics = new HoodieLockMetrics(writeConfig);
   }
 
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write 
handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String 
partitionPath, String fileId) {

Review Comment:
   Does this constructor also need to initialize `metrics`?
   `metrics = new HoodieLockMetrics(writeConfig);`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem 
fs) {
     metrics = new HoodieLockMetrics(writeConfig);
   }
 
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write 
handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String 
partitionPath, String fileId) {
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" 
+ fileId);
+    this.lockConfiguration = new LockConfiguration(props);
+    maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+        
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+    maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+        
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));

Review Comment:
   nit: extract the common init logic into a method instead of copying the code?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem 
fs) {
     metrics = new HoodieLockMetrics(writeConfig);
   }
 
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write 
handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String 
partitionPath, String fileId) {
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" 
+ fileId);
+    this.lockConfiguration = new LockConfiguration(props);
+    maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+        
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+    maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+        
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+  }
+
+  /**
+   * rebuild lock related configs, only support ZK related lock for now.
+   */
+  private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, 
String key) {
+    TypedProperties props = new TypedProperties(writeConfig.getProps());
+    props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);

Review Comment:
   Here it should check if the ZK-based lock is configured.  Otherwise, it 
should throw an exception.
   
   Generally, we should think about how to support different lock provider 
implementations.  For the first cut, it may be okay to have this specific logic 
here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
       .withDocumentation("Lock provider class name, this should be subclass of 
"
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> 
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+      
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())
+      .sinceVersion("0.12.0")
+      .withDocumentation("Early conflict detection class name, this should be 
subclass of "
+          + 
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE 
= ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.12.0")

Review Comment:
   nit: now this should be planned for `0.13.0` :)



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -67,6 +80,14 @@ public void endTransaction(Option<HoodieInstant> 
currentTxnOwnerInstant) {
     }
   }
 
+  public void endTransaction(String filePath) {

Review Comment:
   Align the argument with `beginTransaction()`, using the same set of 
arguments?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
       .withDocumentation("Lock provider class name, this should be subclass of 
"
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> 
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+      
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())

Review Comment:
   Should this depend on the marker type used with `.withInferFunction()`?  An 
example of `withInferFunction()`:
   ```
     public static final ConfigProperty<String> DYNAMODB_LOCK_PARTITION_KEY = 
ConfigProperty
         .key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key")
         .noDefaultValue()
         .sinceVersion("0.10.0")
         .withInferFunction(cfg -> {
           if (cfg.contains(HoodieTableConfig.NAME)) {
             return Option.of(cfg.getString(HoodieTableConfig.NAME));
           }
           return Option.empty();
         })
         .withDocumentation("For DynamoDB based lock provider, the partition 
key for the DynamoDB lock table. "
                            + "Each Hudi dataset should has it's unique key so 
concurrent writers could refer to the same partition key."
                            + " By default we use the Hudi table name specified 
to be the partition key");
   ```



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -187,8 +194,54 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    * @param partitionPath Partition path
    */
   protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType());
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    // do early conflict detection before create markers.
+    if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+        && config.isEarlyConflictDetectionEnable()) {
+      HoodieEarlyConflictDetectionStrategy earlyConflictDetectionStrategy = 
config.getEarlyConflictDetectionStrategy();
+      if (earlyConflictDetectionStrategy instanceof 
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy) {
+        createMarkerWithTransaction(earlyConflictDetectionStrategy, 
writeMarkers, partitionPath, dataFileName);
+      } else {
+        createMarkerWithEarlyConflictDetection(earlyConflictDetectionStrategy, 
writeMarkers, partitionPath, dataFileName);
+      }
+    } else {
+      // create marker directly
+      writeMarkers.create(partitionPath, dataFileName, getIOType());
+    }
+  }
+
+  private Option<Path> 
createMarkerWithEarlyConflictDetection(HoodieEarlyConflictDetectionStrategy 
resolutionStrategy,
+                                                              WriteMarkers 
writeMarkers,
+                                                              String 
partitionPath,
+                                                              String 
dataFileName) {
+    Set<HoodieInstant> completedCommitInstants = 
hoodieTable.getMetaClient().getActiveTimeline()

Review Comment:
   I think the `hoodieTable` here should be initialized at the beginning of the 
transaction and used throughout the transaction.  Let's make sure that's the 
case, to make sure it always returns the snapshot of the timeline at the 
beginning of the transaction (later concurrent writes should not leak in), so 
to guarantee the correct behavior for conflict detection.



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieDirectMarkerBasedEarlyConflictDetectionStrategy 
implements HoodieEarlyConflictDetectionStrategy {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  public abstract boolean hasMarkerConflict(String basePath, FileSystem fs, 
String partitionPath, String dataFileName, String instantTime,
+                                            Set<HoodieInstant> 
completedCommitInstants, HoodieTableMetaClient metaClient);
+
+  public abstract void resolveMarkerConflict(String basePath, String 
partitionPath, String dataFileName);
+
+  /**
+   * We need to do list operation here.
+   * In order to reduce the list pressure as much as possible, first we build 
path prefix in advance:  '$base_path/.temp/instant_time/partition_path',
+   * and only list these specific partition_paths we need instead of list all 
the '$base_path/.temp/'
+   * @param basePath
+   * @param partitionPath
+   * @param fileId 162b13d7-9530-48cf-88a4-02241817ae0c-0_1-74-100_003.parquet
+   * @return true if current fileID is already existed under 
.temp/instant_time/partition_path/..
+   * @throws IOException
+   */
+  public boolean checkMarkerConflict(String basePath, String partitionPath, 
String fileId,
+                                      FileSystem fs, String instantTime) 
throws IOException {
+    String tempFolderPath = getTempFolderPath(basePath);
+    long res = Arrays.stream(fs.listStatus(new Path(tempFolderPath)))
+        .parallel()
+        .map(FileStatus::getPath)
+        .filter(markerPath -> {
+          return !markerPath.getName().equalsIgnoreCase(instantTime);
+        })

Review Comment:
   Could we somehow reuse `HoodieTableMetaClient::getMarkerFolderPath(String 
instantTs)` to avoid one list call?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -187,8 +194,54 @@ protected Path makeNewFilePath(String partitionPath, 
String fileName) {
    * @param partitionPath Partition path
    */
   protected void createMarkerFile(String partitionPath, String dataFileName) {
-    WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
-        .create(partitionPath, dataFileName, getIOType());
+    WriteMarkers writeMarkers = 
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+    // do early conflict detection before create markers.
+    if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+        && config.isEarlyConflictDetectionEnable()) {
+      HoodieEarlyConflictDetectionStrategy earlyConflictDetectionStrategy = 
config.getEarlyConflictDetectionStrategy();
+      if (earlyConflictDetectionStrategy instanceof 
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy) {
+        createMarkerWithTransaction(earlyConflictDetectionStrategy, 
writeMarkers, partitionPath, dataFileName);
+      } else {
+        createMarkerWithEarlyConflictDetection(earlyConflictDetectionStrategy, 
writeMarkers, partitionPath, dataFileName);
+      }
+    } else {
+      // create marker directly
+      writeMarkers.create(partitionPath, dataFileName, getIOType());
+    }

Review Comment:
   Should we move this logic into specific `WriteMarkers` implementations, 
i.e., direct marker-based strategy into DirectWriteMarkers, and async timeline 
server-based strategy into TimelineServerBasedWriteMarkers?  Because early 
conflict detection is tightly coupled with the marker creation process and the 
type of markers. 



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -109,6 +126,7 @@ public void stop() {
     }
     dispatchingExecutorService.shutdown();
     batchingExecutorService.shutdown();
+    checkers.values().forEach(ExecutorService::shutdown);

Review Comment:
   Clear the checkers map?



##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +216,56 @@ public static Set<String> readMarkersFromFile(Path 
markersFilePath, Serializable
       fsDataInputStream = fs.open(markersFilePath);
       markers = new 
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      if (ignoreException) {
+        LOG.warn("IOException occurs during read MARKERS file, ", e);
+      } else {
+        throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      }
     } finally {
       closeQuietly(fsDataInputStream);
     }
     return markers;
   }
+
+  /**
+   * Reads files containing the markers written by timeline-server-based 
marker mechanism locally instead of using cluster Context.
+   *
+   * @param markerDir   marker directory.
+   * @param fileSystem  file system to use.
+   * @return A {@code Map} of file name to the set of markers stored in the 
file.
+   */
+  public static Set<String> 
readTimelineServerBasedMarkersFromFileSystemLocally(String markerDir, 
FileSystem fileSystem) {

Review Comment:
   You avoid adding this method by reusing the following method with local 
engine context.
   ```
   public static Map<String, Set<String>> 
readTimelineServerBasedMarkersFromFileSystem(
         String markerDir, FileSystem fileSystem, HoodieEngineContext context, 
int parallelism)
   ```



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName,
+                                                String batchInterval, String 
period, String maxAllowableHeartbeatIntervalInMs,
+                                                String basePath, String 
earlyConflictDetectionEnable,
+                                                String 
earlyConflictDetectionClassName) {
+    // Step1 do early conflict detection if enable
+    if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+          }
+
+          if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+            this.currentMarkerDir = markerDir;
+            Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+            Set<HoodieInstant> oldInstants = 
viewManager.getFileSystemView(basePath)
+                .getTimeline()
+                .filterCompletedInstants()
+                .filter(instant -> actions.contains(instant.getAction()))
+                .getInstants()
+                .collect(Collectors.toSet());
+
+            earlyConflictDetectionStrategy.fresh(batchInterval, period, 
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+                this, oldInstants);
+          }
+        }
+
+        if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+          earlyConflictDetectionStrategy.resolveMarkerConflict(basePath, 
markerDir, markerName);

Review Comment:
   No exception should be thrown here at the timeline server if there is 
detected conflict.  The timeline server should simply return false for the 
marker creation request and let the executor/write handle resolve the marker 
conflict (throw the exception).



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,14 +143,54 @@ protected Option<Path> create(String partitionPath, 
String dataFileName, IOType
     HoodieTimer timer = HoodieTimer.start();
     String markerFileName = getMarkerFileName(dataFileName, type);
 
-    Map<String, String> paramsMap = new HashMap<>();
+    Map<String, String> paramsMap = initConfigMap(partitionPath, 
markerFileName);
+    boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, 
markerFileName);
+    LOG.info("[timeline-server-based] Created marker file " + partitionPath + 
"/" + markerFileName
+        + " in " + timer.endTimer() + " ms");
+    if (success) {
+      return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, 
partitionPath), markerFileName));
+    } else {
+      return Option.empty();
+    }
+  }
+
+  @Override
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String dataFileName, IOType type, boolean checkIfExists,
+                                                       
HoodieEarlyConflictDetectionStrategy resolutionStrategy,
+                                                       Set<HoodieInstant> 
completedCommitInstants, HoodieWriteConfig config, String fileId) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    String markerFileName = getMarkerFileName(dataFileName, type);
+    Map<String, String> paramsMap = initConfigMap(partitionPath, 
markerFileName);
+
+    paramsMap.put(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL, 
config.getMarkerConflictCheckerBatchInterval());
+    paramsMap.put(MARKER_CONFLICT_CHECKER_PERIOD, 
config.getMarkerConflictCheckerPeriod());

Review Comment:
   nit: group the `MARKER_CONFLICT_CHECKER_*` params together



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
       .withDocumentation("Lock provider class name, this should be subclass of 
"
           + "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> 
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+      
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())
+      .sinceVersion("0.12.0")
+      .withDocumentation("Early conflict detection class name, this should be 
subclass of "
+          + 
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE 
= ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.12.0")
+      .withDocumentation("Enable early conflict detection based on markers. It 
will try to detect writing conflict before create markers and fast fail"
+          + " which will release cluster resources as soon as possible.");
+
+  public static final ConfigProperty<Long> 
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.async.checker.batch.interval")
+      .defaultValue(30000L)
+      .sinceVersion("0.12.0")
+      .withDocumentation("Used for timeline based marker 
AsyncTimelineMarkerConflictResolutionStrategy. The time to delay first async 
marker conflict checking.");
+
+  public static final ConfigProperty<Long> MARKER_CONFLICT_CHECKER_PERIOD = 
ConfigProperty
+      .key(LOCK_PREFIX + "early.conflict.async.checker.period")
+      .defaultValue(30000L)
+      .sinceVersion("0.12.0")
+      .withDocumentation("Used for timeline based marker 
AsyncTimelineMarkerConflictResolutionStrategy. The period between each marker 
conflict checking.");

Review Comment:
   Instead of using the prefix of `hoodie.write.lock.` (`LOCK_PREFIX`), we 
should use `hoodie.write.concurrency.` (creating a new constant String for 
that) and put these configs into `HoodieWriteConfig`, since the early conflict 
detection falls under the concurrency control. 



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+  private static final Logger LOG = 
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+  private MarkerHandler markerHandler;
+  private String markerDir;
+  private String basePath;
+  private FileSystem fs;
+  private AtomicBoolean hasConflict;
+  private long maxAllowableHeartbeatIntervalInMs;
+  private Set<HoodieInstant> oldInstants;
+
+  public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler 
markerHandler, String markerDir,
+                               String basePath, FileSystem fileSystem, long 
maxAllowableHeartbeatIntervalInMs,
+                               Set<HoodieInstant> oldInstants) {
+    this.markerHandler = markerHandler;
+    this.markerDir = markerDir;
+    this.basePath = basePath;
+    this.fs = fileSystem;
+    this.hasConflict = hasConflict;
+    this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+    this.oldInstants = oldInstants;
+  }
+
+  @Override
+  public void run() {
+    try {
+      if (!fs.exists(new Path(markerDir))) {
+        return;
+      }
+
+      HoodieTimer timer = new HoodieTimer().startTimer();
+      Set<String> currentInstantAllMarkers = 
markerHandler.getAllMarkers(markerDir);
+      Path tempPath = new Path(basePath + Path.SEPARATOR + 
HoodieTableMetaClient.TEMPFOLDER_NAME);
+
+      List<Path> instants = MarkerUtils.getAllMarkerDir(tempPath, fs);
+      List<String> candidate = getCandidateInstants(instants, 
markerDirToInstantTime(markerDir));
+      Set<String> tableMarkers = candidate.stream().flatMap(instant -> {
+        return 
MarkerUtils.readTimelineServerBasedMarkersFromFileSystemLocally(instant, 
fs).stream();
+      }).collect(Collectors.toSet());
+
+      Set<String> currentFileIDs = 
currentInstantAllMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet());
+      Set<String> tableFilesIDs = 
tableMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet());
+
+      currentFileIDs.retainAll(tableFilesIDs);
+
+      if (!currentFileIDs.isEmpty() || 
hasCommitConflict(currentInstantAllMarkers, basePath)) {
+        LOG.warn("Conflict writing detected based on markers!\n"
+            + "Conflict markers: " + currentInstantAllMarkers + "\n"
+            + "Table markers: " + tableMarkers);
+        hasConflict.compareAndSet(false, true);
+      }
+      LOG.info("Finish batch marker checker in " + timer.endTimer() + " ms");
+
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException occurs during checking marker 
conflict");
+    }
+  }
+
+  /**
+   * Get Candidate Instant to do conflict checking:
+   * 1. Skip current writer related instant(currentInstantTime)
+   * 2. Skip all instants after currentInstantTime
+   * 3. Skip dead writers related instants based on heart-beat
+   * @param instants
+   * @return
+   */
+  private List<String> getCandidateInstants(List<Path> instants, String 
currentInstantTime) {

Review Comment:
   Can we adapt the common logic from `ConflictResolutionStrategy` instead of 
reinventing similar logic?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,14 +143,54 @@ protected Option<Path> create(String partitionPath, 
String dataFileName, IOType
     HoodieTimer timer = HoodieTimer.start();
     String markerFileName = getMarkerFileName(dataFileName, type);
 
-    Map<String, String> paramsMap = new HashMap<>();
+    Map<String, String> paramsMap = initConfigMap(partitionPath, 
markerFileName);
+    boolean success = executeCreateMarkerRequest(paramsMap, partitionPath, 
markerFileName);
+    LOG.info("[timeline-server-based] Created marker file " + partitionPath + 
"/" + markerFileName
+        + " in " + timer.endTimer() + " ms");
+    if (success) {
+      return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, 
partitionPath), markerFileName));
+    } else {
+      return Option.empty();
+    }
+  }
+
+  @Override
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String dataFileName, IOType type, boolean checkIfExists,
+                                                       
HoodieEarlyConflictDetectionStrategy resolutionStrategy,
+                                                       Set<HoodieInstant> 
completedCommitInstants, HoodieWriteConfig config, String fileId) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    String markerFileName = getMarkerFileName(dataFileName, type);
+    Map<String, String> paramsMap = initConfigMap(partitionPath, 
markerFileName);
+
+    paramsMap.put(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL, 
config.getMarkerConflictCheckerBatchInterval());
+    paramsMap.put(MARKER_CONFLICT_CHECKER_PERIOD, 
config.getMarkerConflictCheckerPeriod());
     paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString());

Review Comment:
   this should be moved to `initConfigMap()` as well.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hadoop.fs.FileSystem;
+import 
org.apache.hudi.common.conflict.detection.HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.Set;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early 
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there 
is any marker file conflict.
+ */
+public class SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy 
extends HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy {
+  private static final Logger LOG = 
LogManager.getLogger(SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  @Override
+  public boolean hasMarkerConflict(String basePath, FileSystem fs, String 
partitionPath, String fileId, String instantTime,

Review Comment:
   The logic here looks like the same as 
`SimpleDirectMarkerBasedEarlyConflictDetectionStrategy`.  Does it make sense to 
move the transaction management inside the strategy to make it easier to 
understand?



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+public abstract class 
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy

Review Comment:
   Do we still need this?



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -658,4 +740,56 @@ private JavaRDD<WriteStatus> 
startCommitForUpdate(HoodieWriteConfig writeConfig,
     assertNoWriteErrors(statuses);
     return result;
   }
+
+  public static Stream<Arguments> configParams() {
+    Object[][] data =
+        new Object[][] {{"COPY_ON_WRITE", 
MarkerType.TIMELINE_SERVER_BASED.name()}, {"MERGE_ON_READ", 
MarkerType.TIMELINE_SERVER_BASED.name()},
+            {"MERGE_ON_READ", MarkerType.DIRECT.name()}, {"COPY_ON_WRITE", 
MarkerType.DIRECT.name()}};
+    return Stream.of(data).map(Arguments::of);
+  }
+
+  private HoodieWriteConfig buildWriteConfigForEarlyConflictDetect(String 
markerType, Properties properties) {
+    if (markerType.equalsIgnoreCase(MarkerType.DIRECT.name())) {
+      return getConfigBuilder()
+          .withHeartbeatIntervalInMs(3600 * 1000)
+          .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+              .withStorageType(FileSystemViewStorageType.MEMORY)
+              
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+          .withCleanConfig(HoodieCleanConfig.newBuilder()
+              
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+              .withAutoClean(false).build())
+          .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+              .withAutoArchive(false).build())
+          
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+          .withMarkersType(MarkerType.DIRECT.name())
+          
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+              .withEarlyConflictDetectionEnable(true)
+              
.withEarlyConflictDetectionStrategy(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName())

Review Comment:
   For direct markers, should we also test 
`SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy`?



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+public interface HoodieEarlyConflictDetectionStrategy {

Review Comment:
   Move common methods here?



##########
hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java:
##########
@@ -29,6 +29,12 @@ public class MarkerOperation implements Serializable {
 
   public static final String MARKER_DIR_PATH_PARAM = "markerdirpath";
   public static final String MARKER_NAME_PARAM = "markername";
+  public static final String MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = 
"batchinterval";
+  public static final String MARKER_CONFLICT_CHECKER_PERIOD = "period";
+  public static final String MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL = 
"heartbeatinterval";
+  public static final String MARKER_BASEPATH_PARAM = "basepath";
+  public static final String MARKER_CONFLICT_CHECKER_ENABLE = 
"HoodieEarlyConflictDetectionStrategy";
+  public static final String MARKER_CONFLICT_CHECKER_STRATEGY = 
"earlyconflictdetectionstrategy";

Review Comment:
   Let's add `marker` as the prefix for the parameter naming and avoid capital 
letters.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName,
+                                                String batchInterval, String 
period, String maxAllowableHeartbeatIntervalInMs,
+                                                String basePath, String 
earlyConflictDetectionEnable,
+                                                String 
earlyConflictDetectionClassName) {
+    // Step1 do early conflict detection if enable

Review Comment:
   Should the async conflict detection thread be started in the constructor of 
the `MarkerHandler`?  It does not make sense to start the checker for every 
request.  Anyway, the checker is doing the detection in a batching and async 
way.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName,
+                                                String batchInterval, String 
period, String maxAllowableHeartbeatIntervalInMs,
+                                                String basePath, String 
earlyConflictDetectionEnable,
+                                                String 
earlyConflictDetectionClassName) {
+    // Step1 do early conflict detection if enable
+    if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+          }

Review Comment:
   init this in constructor?



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
    * @param markerName marker name
    * @return the {@code CompletableFuture} instance for the request
    */
-  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName) {
+  public CompletableFuture<String> createMarker(Context context, String 
markerDir, String markerName,
+                                                String batchInterval, String 
period, String maxAllowableHeartbeatIntervalInMs,
+                                                String basePath, String 
earlyConflictDetectionEnable,
+                                                String 
earlyConflictDetectionClassName) {
+    // Step1 do early conflict detection if enable
+    if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+      try {
+        synchronized (earlyConflictDetectionLock) {
+          if (earlyConflictDetectionStrategy == null) {
+            earlyConflictDetectionStrategy = 
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+          }
+
+          if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+            this.currentMarkerDir = markerDir;
+            Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+            Set<HoodieInstant> oldInstants = 
viewManager.getFileSystemView(basePath)
+                .getTimeline()
+                .filterCompletedInstants()
+                .filter(instant -> actions.contains(instant.getAction()))
+                .getInstants()
+                .collect(Collectors.toSet());
+
+            earlyConflictDetectionStrategy.fresh(batchInterval, period, 
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+                this, oldInstants);
+          }
+        }
+
+        if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+          earlyConflictDetectionStrategy.resolveMarkerConflict(basePath, 
markerDir, markerName);
+        }

Review Comment:
   Only this should be kept when doing batch marker creation in 
`MarkerDirState::processMarkerCreationRequests`.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+  private static final Logger LOG = 
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+  private MarkerHandler markerHandler;
+  private String markerDir;
+  private String basePath;
+  private FileSystem fs;
+  private AtomicBoolean hasConflict;
+  private long maxAllowableHeartbeatIntervalInMs;
+  private Set<HoodieInstant> oldInstants;
+
+  public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler 
markerHandler, String markerDir,

Review Comment:
   Is it easy to add a unit test for this checker?



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+  private static final Logger LOG = 
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+  private MarkerHandler markerHandler;
+  private String markerDir;
+  private String basePath;
+  private FileSystem fs;
+  private AtomicBoolean hasConflict;
+  private long maxAllowableHeartbeatIntervalInMs;
+  private Set<HoodieInstant> oldInstants;
+
+  public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler 
markerHandler, String markerDir,
+                               String basePath, FileSystem fileSystem, long 
maxAllowableHeartbeatIntervalInMs,
+                               Set<HoodieInstant> oldInstants) {
+    this.markerHandler = markerHandler;
+    this.markerDir = markerDir;
+    this.basePath = basePath;
+    this.fs = fileSystem;
+    this.hasConflict = hasConflict;
+    this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+    this.oldInstants = oldInstants;
+  }
+
+  @Override
+  public void run() {
+    try {
+      if (!fs.exists(new Path(markerDir))) {
+        return;
+      }
+
+      HoodieTimer timer = new HoodieTimer().startTimer();

Review Comment:
   nit: use `HoodieTimer.start()`;



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {

Review Comment:
   nit: rename to `MarkerBasedEarlyConflictDetectionRunnable`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to