yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1063209840


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we 
check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. 
Enabling this config will bypass this validation");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> 
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+      .noDefaultValue()
+      .sinceVersion("0.13.0")
+      .withInferFunction(cfg -> {
+        MarkerType markerType = 
MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+        switch (markerType) {
+          case DIRECT:
+            return 
Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+          case TIMELINE_SERVER_BASED:
+          default:
+            return 
Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+        }
+      })
+      .withDocumentation("Early conflict detection class name, this should be 
subclass of "
+          + 
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE 
= ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Enable early conflict detection based on markers. It 
will try to detect writing conflict before create markers and fast fail"
+          + " which will release cluster resources as soon as possible.");
+
+  public static final ConfigProperty<Long> 
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty

Review Comment:
   Add unit for the time configs



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -148,13 +185,30 @@ protected Option<Path> create(String partitionPath, 
String dataFileName, IOType
     } catch (IOException e) {
       throw new HoodieRemoteException("Failed to create marker file " + 
partitionPath + "/" + markerFileName, e);
     }
-    LOG.info("[timeline-server-based] Created marker file " + partitionPath + 
"/" + markerFileName
-        + " in " + timer.endTimer() + " ms");
-    if (success) {
-      return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath, 
partitionPath), markerFileName));
+    return success;
+  }
+
+  /**
+   * init create marker related config maps.
+   * @param partitionPath
+   * @param markerFileName
+   * @return
+   */
+  private Map<String, String> initConfigMap(String partitionPath, String 
markerFileName, boolean initEarlyConflictConfigs) {

Review Comment:
   nit: `initEarlyConflictConfigs` -> `initConflictDetectionConfigs`



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
       .withDocumentation("When table is upgraded from pre 0.12 to 0.12, we 
check for \"default\" partition and fail if found one. "
           + "Users are expected to rewrite the data in those partitions. 
Enabling this config will bypass this validation");
 
+  // Pluggable strategies to use when early conflict detection
+  public static final ConfigProperty<String> 
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+      .noDefaultValue()
+      .sinceVersion("0.13.0")
+      .withInferFunction(cfg -> {
+        MarkerType markerType = 
MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+        switch (markerType) {
+          case DIRECT:
+            return 
Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+          case TIMELINE_SERVER_BASED:
+          default:
+            return 
Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+        }
+      })
+      .withDocumentation("Early conflict detection class name, this should be 
subclass of "
+          + 
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+  public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE 
= ConfigProperty
+      .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+      .defaultValue(false)
+      .sinceVersion("0.13.0")
+      .withDocumentation("Enable early conflict detection based on markers. It 
will try to detect writing conflict before create markers and fast fail"
+          + " which will release cluster resources as soon as possible.");
+
+  public static final ConfigProperty<Long> 
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty

Review Comment:
   naming: `checker` -> `detector` in all places.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
 public class LockManager implements Serializable, AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(LockManager.class);
-  private final HoodieWriteConfig writeConfig;
-  private final LockConfiguration lockConfiguration;
-  private final SerializableConfiguration hadoopConf;
-  private final int maxRetries;
-  private final long maxWaitTimeInMs;
+  private HoodieWriteConfig writeConfig;
+  private LockConfiguration lockConfiguration;
+  private SerializableConfiguration hadoopConf;
+  private int maxRetries;
+  private long maxWaitTimeInMs;

Review Comment:
   These should still be final.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,46 @@ public WriteMarkers(String basePath, String 
markerFolderPath, String instantTime
     this.instantTime = instantTime;
   }
 
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type) {
+    return create(partitionPath, dataFileName, type, Option.empty());
+  }
+
   /**
    * Creates a marker without checking if the marker already exists.
    *
    * @param partitionPath partition path in the table
    * @param dataFileName data file name
    * @param type  write IO type
+   * @param handler could be empty
    * @return the marker path
    */
-  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type) {
-    return create(partitionPath, dataFileName, type, false);
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type, Option<HoodieWriteHandle> handler) {
+    boolean checkIfExists = false;
+
+    if (handler.isPresent()
+        && 
handler.get().getConfig().getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+        && handler.get().getConfig().isEarlyConflictDetectionEnable()) {
+
+      HoodieTableMetaClient metaClient = 
handler.get().getHoodieTableMetaClient();
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+      HoodieTimeline pendingCompactionTimeline = 
activeTimeline.filterPendingCompactionTimeline();
+      HoodieTimeline pendingReplaceTimeline = 
activeTimeline.filterPendingReplaceTimeline();
+      // TODO if current is compact or clustering then create marker directly 
without early conflict detection.
+      // Need to support early conflict detection between table service and 
common writers.
+      if (pendingCompactionTimeline.containsInstant(instantTime) || 
pendingReplaceTimeline.containsInstant(instantTime)) {

Review Comment:
   Sg.  @zhangyue19921010 could you add the details around this implementation 
decision to the RFC?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -86,7 +86,17 @@ public void startServer() throws IOException {
           .enableMarkerRequests(true)
           
.markerBatchNumThreads(writeConfig.getMarkersTimelineServerBasedBatchNumThreads())
           
.markerBatchIntervalMs(writeConfig.getMarkersTimelineServerBasedBatchIntervalMs())
-          .markerParallelism(writeConfig.getMarkersDeleteParallelism());
+          .markerParallelism(writeConfig.getMarkersDeleteParallelism())
+      ;

Review Comment:
   nit: should be in the same line.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -155,6 +160,20 @@ protected Option<Path> create(String partitionPath, String 
dataFileName, IOType
     return create(getMarkerPath(partitionPath, dataFileName, type), 
checkIfExists);
   }
 
+  @Override
+  public Option<Path> createWithEarlyConflictDetection(String partitionPath, 
String dataFileName, IOType type, boolean checkIfExists, Set<HoodieInstant> 
completedCommitInstants,

Review Comment:
   Could `completedCommitInstants` be derived from `activeTimeline`?  If so, 
there is no need to pass in `completedCommitInstants`.



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieTimelineServerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.util.Set;
+
+public abstract class HoodieTimelineServerBasedEarlyConflictDetectionStrategy 
implements HoodieEarlyConflictDetectionStrategy {

Review Comment:
   Implementation-wise, `HoodieEarlyConflictDetectionStrategy` is used by the 
write client.  I suggest that the timeline-server-related logic, i.e., the 
`fresh` method containing the scheduling of the marker checker, be moved to the 
timeline server.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,47 @@ public WriteMarkers(String basePath, String 
markerFolderPath, String instantTime
     this.instantTime = instantTime;
   }
 
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type) {
+    return create(partitionPath, dataFileName, type, Option.empty());
+  }
+
   /**
    * Creates a marker without checking if the marker already exists.
    *
    * @param partitionPath partition path in the table
    * @param dataFileName data file name
    * @param type  write IO type
+   * @param handler could be empty
    * @return the marker path
    */
-  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type) {
-    return create(partitionPath, dataFileName, type, false);
+  public Option<Path> create(String partitionPath, String dataFileName, IOType 
type, Option<HoodieWriteHandle> handler) {

Review Comment:
   nit: `HoodieWriteHandle` instance should not be directly passed in here.  
Instead, only pass in what's needed, i.e., relevant write config, the timeline, 
and the file ID, to avoid exposure of `HoodieWriteHandle`.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem 
fs) {
     metrics = new HoodieLockMetrics(writeConfig);
   }
 
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write 
handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String 
partitionPath, String fileId) {
+    this.writeConfig = writeConfig;
+    this.hadoopConf = new SerializableConfiguration(fs.getConf());
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" 
+ fileId);
+    this.lockConfiguration = new LockConfiguration(props);
+    maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+        
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+    maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+        
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+  }
+
+  /**
+   * rebuild lock related configs, only support ZK related lock for now.
+   */
+  private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, 
String key) {
+    TypedProperties props = new TypedProperties(writeConfig.getProps());
+    props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);

Review Comment:
   Sg.  Let's use `LOCK_PROVIDER_CLASS_NAME` instead of `ZK_BASE_PATH_PROP_KEY` 
for checking whether ZK-based lock is configured.
   
   @zhangyue19921010 could you file a JIRA ticket besides the TODO because this 
requires more work?



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -57,6 +62,14 @@ public void beginTransaction(Option<HoodieInstant> 
newTxnOwnerInstant,
     }
   }
 
+  public void beginTransaction(String partitionPath, String fileId) {

Review Comment:
   This can be simplified without taking the arguments because the constructor 
takes the partition path and file ID.  The transaction manager for the direct 
makers should be implemented in a separate class to isolate the logic.



##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java:
##########
@@ -124,6 +124,29 @@ public static class Config implements Serializable {
     @Parameter(names = {"--marker-parallelism", "-mdp"}, description = 
"Parallelism to use for reading and deleting marker files")
     public int markerParallelism = 100;
 
+    @Parameter(names = {"--early-conflict-detection-strategy"}, description = 
"Early conflict detection class name, this should be subclass of "
+        + 
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy")
+    public String earlyConflictDetectStrategy = 
"org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineMarkerEarlyConflictDetectionStrategy";
+
+    @Parameter(names = {"--early-conflict-detection-check-commit-conflict"}, 
description = "Enable check commit conflict or not during early conflict 
detect")
+    public Boolean checkCommitConflict = false;
+
+    @Parameter(names = {"--early-conflict-detection-enable"}, description = 
"Enable early conflict detection based on markers. It will try to detect 
writing conflict "
+        + "before create markers and fast fail which will release cluster 
resources as soon as possible.")
+    public Boolean earlyConflictDetectionEnable = false;
+
+    @Parameter(names = {"--early-conflict-async-checker-batch-interval"}, 
description = "Used for timeline based marker 
AsyncTimelineMarkerConflictResolutionStrategy. "

Review Comment:
   nit: rename the timeline server configs to be aligned with 
`HoodieWriteConfig`



##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieDirectMarkerBasedEarlyConflictDetectionStrategy 
implements HoodieEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+  protected final String basePath;
+  protected final FileSystem fs;
+  protected final String partitionPath;
+  protected final String fileId;
+  protected final String instantTime;
+  protected final HoodieActiveTimeline activeTimeline;
+  protected final HoodieConfig config;
+  protected Set<HoodieInstant> completedCommitInstants;
+  protected final Boolean checkCommitConflict;
+  protected final Long maxAllowableHeartbeatIntervalInMs;
+
+  public HoodieDirectMarkerBasedEarlyConflictDetectionStrategy(String 
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, 
String instantTime,
+                                                               
HoodieActiveTimeline activeTimeline, HoodieConfig config, Boolean 
checkCommitConflict, Long maxAllowableHeartbeatIntervalInMs,
+                                                               
HashSet<HoodieInstant> completedCommitInstants) {
+    this.basePath = basePath;
+    this.fs = fs;
+    this.partitionPath = partitionPath;
+    this.fileId = fileId;
+    this.instantTime = instantTime;
+    this.completedCommitInstants = completedCommitInstants;
+    this.activeTimeline = activeTimeline;
+    this.config = config;
+    this.checkCommitConflict = checkCommitConflict;
+    this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+  }
+
+  /**
+   * We need to do list operation here.
+   * In order to reduce the list pressure as much as possible, first we build 
path prefix in advance:  '$base_path/.temp/instant_time/partition_path',
+   * and only list these specific partition_paths we need instead of list all 
the '$base_path/.temp/'
+   * @param basePath
+   * @param partitionPath
+   * @param fileId 162b13d7-9530-48cf-88a4-02241817ae0c-0_1-74-100_003.parquet
+   * @return true if current fileID is already existed under 
.temp/instant_time/partition_path/..
+   * @throws IOException
+   */
+  public boolean checkMarkerConflict(HoodieActiveTimeline activeTimeline, 
String basePath, String partitionPath, String fileId,

Review Comment:
   Similarly, the arguments should be simplified since the constructor takes 
most of the information already.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
 public class LockManager implements Serializable, AutoCloseable {
 
   private static final Logger LOG = LogManager.getLogger(LockManager.class);
-  private final HoodieWriteConfig writeConfig;
-  private final LockConfiguration lockConfiguration;
-  private final SerializableConfiguration hadoopConf;
-  private final int maxRetries;
-  private final long maxWaitTimeInMs;
+  private HoodieWriteConfig writeConfig;
+  private LockConfiguration lockConfiguration;
+  private SerializableConfiguration hadoopConf;
+  private int maxRetries;
+  private long maxWaitTimeInMs;
   private transient HoodieLockMetrics metrics;
   private volatile LockProvider lockProvider;
 
   public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+    init(writeConfig, fs.getConf(), writeConfig.getProps());
+  }
+
+  /**
+   * Try to have a lock at partitionPath + fileID level for different write 
handler.
+   * @param writeConfig
+   * @param fs
+   * @param partitionPath
+   * @param fileId
+   */
+  public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String 
partitionPath, String fileId) {
+    TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/" 
+ fileId);
+    init(writeConfig, fs.getConf(), props);
+  }
+
+  private void init(HoodieWriteConfig writeConfig, Configuration conf, 
TypedProperties lockProps) {
+    this.lockConfiguration = new LockConfiguration(lockProps);
     this.writeConfig = writeConfig;
-    this.hadoopConf = new SerializableConfiguration(fs.getConf());
-    this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
-    maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+    this.hadoopConf = new SerializableConfiguration(conf);
+    this.maxRetries = 
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
         
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
-    maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+    this.maxWaitTimeInMs = 
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
         
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
-    metrics = new HoodieLockMetrics(writeConfig);
+    this.metrics = new HoodieLockMetrics(writeConfig);
+  }
+
+  /**
+   * rebuild lock related configs, only support ZK related lock for now.
+   */
+  private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig, 
String key) {

Review Comment:
   It is better to move this logic of changing lock configs to the transaction 
manager for the direct makers, as the lock manager itself should not be aware 
of any higher-layer logic.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import 
org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early 
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there 
is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends 
HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String 
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, 
String instantTime,

Review Comment:
   nit: the full write config is not needed.  We should simply the list of 
arguments passed in here.



##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import 
org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early 
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there 
is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends 
HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+  private static final Logger LOG = 
LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+  public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String 
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId, 
String instantTime,
+                                                               
HoodieActiveTimeline activeTimeline, HoodieWriteConfig config, Boolean 
checkCommitConflict) {
+    super(basePath, fs, partitionPath, fileId, instantTime, activeTimeline, 
config, checkCommitConflict);
+  }
+
+  @Override
+  public boolean hasMarkerConflict() {
+    try {
+      return checkMarkerConflict(basePath, partitionPath, fileId, fs, 
instantTime)
+          || (checkCommitConflict && 
MarkerUtils.hasCommitConflict(Stream.of(fileId).collect(Collectors.toSet()), 
basePath, completedCommitInstants));
+    } catch (IOException e) {
+      LOG.warn("Exception occurs during create marker file in eager conflict 
detection mode.");
+      throw new HoodieIOException("Exception occurs during create marker file 
in eager conflict detection mode.", e);
+    }
+  }
+
+  @Override
+  public void resolveMarkerConflict(String basePath, String partitionPath, 
String dataFileName) {
+    throw new HoodieEarlyConflictDetectionException(new 
ConcurrentModificationException("Early conflict detected but cannot resolve 
conflicts for overlapping writes"));

Review Comment:
   As @zhangyue19921010 explained, this is unnecessary because compaction and 
clustering instants do not go through the early conflict detection logic.



##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +219,116 @@ public static Set<String> readMarkersFromFile(Path 
markersFilePath, Serializable
       fsDataInputStream = fs.open(markersFilePath);
       markers = new 
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
     } catch (IOException e) {
-      throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      if (ignoreException) {
+        LOG.warn("IOException occurs during read MARKERS file, ", e);
+      } else {
+        throw new HoodieIOException("Failed to read MARKERS file " + 
markersFilePath, e);
+      }
     } finally {
       closeQuietly(fsDataInputStream);
     }
     return markers;
   }
+
+  public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs) 
throws IOException {
+    return 
Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+  }
+
+  public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline, 
Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+    Set<HoodieInstant> currentInstants = 
activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+
+    currentInstants.removeAll(completedCommitInstants);
+    Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+      try {
+        return 
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(), 
HoodieCommitMetadata.class)
+            .getFileIdAndRelativePaths().keySet().stream();
+      } catch (Exception e) {
+        return Stream.empty();
+      }
+    }).collect(Collectors.toSet());
+    currentFileIDs.retainAll(missingFileIDs);
+    return !currentFileIDs.isEmpty();
+  }
+
+  /**
+   * Get Candidate Instant to do conflict checking:
+   * 1. Skip current writer related instant(currentInstantTime)
+   * 2. Skip all instants after currentInstantTime
+   * 3. Skip dead writers related instants based on heart-beat
+   * 4. Skip pending compaction instant (For now we don' do early conflict 
check with compact action)
+   *      Because we don't want to let pending compaction block common writer.
+   * @param instants
+   * @return
+   */
+  public static List<String> getCandidateInstants(HoodieActiveTimeline 
activeTimeline, List<Path> instants, String currentInstantTime,
+                                                  long 
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+    HoodieActiveTimeline reloadActive = activeTimeline.reload();
+
+    return instants.stream().map(Path::toString).filter(instantPath -> {
+      String instantTime = markerDirToInstantTime(instantPath);
+      return instantTime.compareToIgnoreCase(currentInstantTime) < 0
+          && 
!reloadActive.filterPendingCompactionTimeline().containsInstant(instantTime)
+          && 
!reloadActive.filterPendingReplaceTimeline().containsInstant(instantTime);
+    }).filter(instantPath -> {
+      try {
+        return !isHeartbeatExpired(markerDirToInstantTime(instantPath), 
maxAllowableHeartbeatIntervalInMs, fs, basePath);
+      } catch (IOException e) {
+        return false;
+      }
+    }).collect(Collectors.toList());
+  }
+
+  /**
+   * Get fileID from full marker path, for example:
+   * 
20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE
+   *    ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0
+   * @param marker
+   * @return
+   */
+  public static String makerToPartitionAndFileID(String marker) {
+    String[] ele = marker.split("_");
+    return ele[0];
+  }
+
+  /**
+   * Get instantTime from full marker path, for example:
+   * 
/var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003
+   *    ==> 003
+   * @param marker
+   * @return
+   */
+  public static String markerDirToInstantTime(String marker) {
+    String[] ele = marker.split("/");
+    return ele[ele.length - 1];
+  }
+
+  /**
+   * Use modification time as last heart beat time
+   * @param fs
+   * @param basePath
+   * @param instantTime
+   * @return
+   * @throws IOException
+   */
+  public static Long getLastHeartbeatTime(FileSystem fs, String basePath, 
String instantTime) throws IOException {
+    Path heartbeatFilePath = new 
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR + 
instantTime);
+    if (fs.exists(heartbeatFilePath)) {
+      return fs.getFileStatus(heartbeatFilePath).getModificationTime();
+    } else {
+      // NOTE : This can happen when a writer is upgraded to use lazy cleaning 
and the last write had failed
+      return 0L;
+    }
+  }
+
+  public static boolean isHeartbeatExpired(String instantTime, long 
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) throws 
IOException {
+    Long currentTime = System.currentTimeMillis();
+    Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime);
+    if (currentTime - lastHeartbeatTime > maxAllowableHeartbeatIntervalInMs) {
+      LOG.warn("Heartbeat expired, for instant: " + instantTime);
+      return true;
+    }
+    return false;
+  }

Review Comment:
   Given the limitation, now it makes sense to move the `HeartbeatUtils` to 
`hudi-common`, so common utils can be shared instead of code duplication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to