yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1023177121
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem
fs) {
metrics = new HoodieLockMetrics(writeConfig);
}
+ /**
+ * Try to have a lock at partitionPath + fileID level for different write
handler.
+ * @param writeConfig
+ * @param fs
+ * @param partitionPath
+ * @param fileId
+ */
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String
partitionPath, String fileId) {
Review Comment:
Does this constructor also need to initialize `metrics`?
`metrics = new HoodieLockMetrics(writeConfig);`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem
fs) {
metrics = new HoodieLockMetrics(writeConfig);
}
+ /**
+ * Try to have a lock at partitionPath + fileID level for different write
handler.
+ * @param writeConfig
+ * @param fs
+ * @param partitionPath
+ * @param fileId
+ */
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String
partitionPath, String fileId) {
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/"
+ fileId);
+ this.lockConfiguration = new LockConfiguration(props);
+ maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+ maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
Review Comment:
nit: extract the common init logic into a method instead of copying the code?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem
fs) {
metrics = new HoodieLockMetrics(writeConfig);
}
+ /**
+ * Try to have a lock at partitionPath + fileID level for different write
handler.
+ * @param writeConfig
+ * @param fs
+ * @param partitionPath
+ * @param fileId
+ */
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String
partitionPath, String fileId) {
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/"
+ fileId);
+ this.lockConfiguration = new LockConfiguration(props);
+ maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+ maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+ }
+
+ /**
+ * rebuild lock related configs, only support ZK related lock for now.
+ */
+ private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig,
String key) {
+ TypedProperties props = new TypedProperties(writeConfig.getProps());
+ props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);
Review Comment:
Here it should check if the ZK-based lock is configured. Otherwise, it
should throw an exception.
Generally, we should think about how to support different lock provider
implementations. For the first cut, it may be okay to have this specific logic
here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
.withDocumentation("Lock provider class name, this should be subclass of
"
+ "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
+ // Pluggable strategies to use when early conflict detection
+ public static final ConfigProperty<String>
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())
+ .sinceVersion("0.12.0")
+ .withDocumentation("Early conflict detection class name, this should be
subclass of "
+ +
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+ public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE
= ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.detection.enable")
+ .defaultValue(false)
+ .sinceVersion("0.12.0")
Review Comment:
nit: now this should be planned for `0.13.0` :)
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -67,6 +80,14 @@ public void endTransaction(Option<HoodieInstant>
currentTxnOwnerInstant) {
}
}
+ public void endTransaction(String filePath) {
Review Comment:
Align the argument with `beginTransaction()`, using the same set of
arguments?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
.withDocumentation("Lock provider class name, this should be subclass of
"
+ "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
+ // Pluggable strategies to use when early conflict detection
+ public static final ConfigProperty<String>
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())
Review Comment:
Should this depend on the marker type used with `.withInferFunction()`? An
example of `withInferFunction()`:
```
public static final ConfigProperty<String> DYNAMODB_LOCK_PARTITION_KEY =
ConfigProperty
.key(DYNAMODB_BASED_LOCK_PROPERTY_PREFIX + "partition_key")
.noDefaultValue()
.sinceVersion("0.10.0")
.withInferFunction(cfg -> {
if (cfg.contains(HoodieTableConfig.NAME)) {
return Option.of(cfg.getString(HoodieTableConfig.NAME));
}
return Option.empty();
})
.withDocumentation("For DynamoDB based lock provider, the partition
key for the DynamoDB lock table. "
+ "Each Hudi dataset should has it's unique key so
concurrent writers could refer to the same partition key."
+ " By default we use the Hudi table name specified
to be the partition key");
```
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -187,8 +194,54 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
* @param partitionPath Partition path
*/
protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType());
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ // do early conflict detection before create markers.
+ if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+ && config.isEarlyConflictDetectionEnable()) {
+ HoodieEarlyConflictDetectionStrategy earlyConflictDetectionStrategy =
config.getEarlyConflictDetectionStrategy();
+ if (earlyConflictDetectionStrategy instanceof
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy) {
+ createMarkerWithTransaction(earlyConflictDetectionStrategy,
writeMarkers, partitionPath, dataFileName);
+ } else {
+ createMarkerWithEarlyConflictDetection(earlyConflictDetectionStrategy,
writeMarkers, partitionPath, dataFileName);
+ }
+ } else {
+ // create marker directly
+ writeMarkers.create(partitionPath, dataFileName, getIOType());
+ }
+ }
+
+ private Option<Path>
createMarkerWithEarlyConflictDetection(HoodieEarlyConflictDetectionStrategy
resolutionStrategy,
+ WriteMarkers
writeMarkers,
+ String
partitionPath,
+ String
dataFileName) {
+ Set<HoodieInstant> completedCommitInstants =
hoodieTable.getMetaClient().getActiveTimeline()
Review Comment:
I think the `hoodieTable` here should be initialized at the beginning of the
transaction and used throughout the transaction. Let's make sure that's the
case, to make sure it always returns the snapshot of the timeline at the
beginning of the transaction (later concurrent writes should not leak in), so
to guarantee the correct behavior for conflict detection.
##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieDirectMarkerBasedEarlyConflictDetectionStrategy
implements HoodieEarlyConflictDetectionStrategy {
+ private static final Logger LOG =
LogManager.getLogger(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+ public abstract boolean hasMarkerConflict(String basePath, FileSystem fs,
String partitionPath, String dataFileName, String instantTime,
+ Set<HoodieInstant>
completedCommitInstants, HoodieTableMetaClient metaClient);
+
+ public abstract void resolveMarkerConflict(String basePath, String
partitionPath, String dataFileName);
+
+ /**
+ * We need to do list operation here.
+ * In order to reduce the list pressure as much as possible, first we build
path prefix in advance: '$base_path/.temp/instant_time/partition_path',
+ * and only list these specific partition_paths we need instead of list all
the '$base_path/.temp/'
+ * @param basePath
+ * @param partitionPath
+ * @param fileId 162b13d7-9530-48cf-88a4-02241817ae0c-0_1-74-100_003.parquet
+ * @return true if current fileID is already existed under
.temp/instant_time/partition_path/..
+ * @throws IOException
+ */
+ public boolean checkMarkerConflict(String basePath, String partitionPath,
String fileId,
+ FileSystem fs, String instantTime)
throws IOException {
+ String tempFolderPath = getTempFolderPath(basePath);
+ long res = Arrays.stream(fs.listStatus(new Path(tempFolderPath)))
+ .parallel()
+ .map(FileStatus::getPath)
+ .filter(markerPath -> {
+ return !markerPath.getName().equalsIgnoreCase(instantTime);
+ })
Review Comment:
Could we somehow reuse `HoodieTableMetaClient::getMarkerFolderPath(String
instantTs)` to avoid one list call?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/io/HoodieWriteHandle.java:
##########
@@ -187,8 +194,54 @@ protected Path makeNewFilePath(String partitionPath,
String fileName) {
* @param partitionPath Partition path
*/
protected void createMarkerFile(String partitionPath, String dataFileName) {
- WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime)
- .create(partitionPath, dataFileName, getIOType());
+ WriteMarkers writeMarkers =
WriteMarkersFactory.get(config.getMarkersType(), hoodieTable, instantTime);
+ // do early conflict detection before create markers.
+ if (config.getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+ && config.isEarlyConflictDetectionEnable()) {
+ HoodieEarlyConflictDetectionStrategy earlyConflictDetectionStrategy =
config.getEarlyConflictDetectionStrategy();
+ if (earlyConflictDetectionStrategy instanceof
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy) {
+ createMarkerWithTransaction(earlyConflictDetectionStrategy,
writeMarkers, partitionPath, dataFileName);
+ } else {
+ createMarkerWithEarlyConflictDetection(earlyConflictDetectionStrategy,
writeMarkers, partitionPath, dataFileName);
+ }
+ } else {
+ // create marker directly
+ writeMarkers.create(partitionPath, dataFileName, getIOType());
+ }
Review Comment:
Should we move this logic into specific `WriteMarkers` implementations,
i.e., direct marker-based strategy into DirectWriteMarkers, and async timeline
server-based strategy into TimelineServerBasedWriteMarkers? Because early
conflict detection is tightly coupled with the marker creation process and the
type of markers.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -109,6 +126,7 @@ public void stop() {
}
dispatchingExecutorService.shutdown();
batchingExecutorService.shutdown();
+ checkers.values().forEach(ExecutorService::shutdown);
Review Comment:
Clear the checkers map?
##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +216,56 @@ public static Set<String> readMarkersFromFile(Path
markersFilePath, Serializable
fsDataInputStream = fs.open(markersFilePath);
markers = new
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
} catch (IOException e) {
- throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ if (ignoreException) {
+ LOG.warn("IOException occurs during read MARKERS file, ", e);
+ } else {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ }
} finally {
closeQuietly(fsDataInputStream);
}
return markers;
}
+
+ /**
+ * Reads files containing the markers written by timeline-server-based
marker mechanism locally instead of using cluster Context.
+ *
+ * @param markerDir marker directory.
+ * @param fileSystem file system to use.
+ * @return A {@code Map} of file name to the set of markers stored in the
file.
+ */
+ public static Set<String>
readTimelineServerBasedMarkersFromFileSystemLocally(String markerDir,
FileSystem fileSystem) {
Review Comment:
You avoid adding this method by reusing the following method with local
engine context.
```
public static Map<String, Set<String>>
readTimelineServerBasedMarkersFromFileSystem(
String markerDir, FileSystem fileSystem, HoodieEngineContext context,
int parallelism)
```
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
*/
- public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName) {
+ public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName,
+ String batchInterval, String
period, String maxAllowableHeartbeatIntervalInMs,
+ String basePath, String
earlyConflictDetectionEnable,
+ String
earlyConflictDetectionClassName) {
+ // Step1 do early conflict detection if enable
+ if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+ try {
+ synchronized (earlyConflictDetectionLock) {
+ if (earlyConflictDetectionStrategy == null) {
+ earlyConflictDetectionStrategy =
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+ }
+
+ if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+ this.currentMarkerDir = markerDir;
+ Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+ Set<HoodieInstant> oldInstants =
viewManager.getFileSystemView(basePath)
+ .getTimeline()
+ .filterCompletedInstants()
+ .filter(instant -> actions.contains(instant.getAction()))
+ .getInstants()
+ .collect(Collectors.toSet());
+
+ earlyConflictDetectionStrategy.fresh(batchInterval, period,
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+ this, oldInstants);
+ }
+ }
+
+ if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+ earlyConflictDetectionStrategy.resolveMarkerConflict(basePath,
markerDir, markerName);
Review Comment:
No exception should be thrown here at the timeline server if there is
detected conflict. The timeline server should simply return false for the
marker creation request and let the executor/write handle resolve the marker
conflict (throw the exception).
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,14 +143,54 @@ protected Option<Path> create(String partitionPath,
String dataFileName, IOType
HoodieTimer timer = HoodieTimer.start();
String markerFileName = getMarkerFileName(dataFileName, type);
- Map<String, String> paramsMap = new HashMap<>();
+ Map<String, String> paramsMap = initConfigMap(partitionPath,
markerFileName);
+ boolean success = executeCreateMarkerRequest(paramsMap, partitionPath,
markerFileName);
+ LOG.info("[timeline-server-based] Created marker file " + partitionPath +
"/" + markerFileName
+ + " in " + timer.endTimer() + " ms");
+ if (success) {
+ return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath,
partitionPath), markerFileName));
+ } else {
+ return Option.empty();
+ }
+ }
+
+ @Override
+ public Option<Path> createWithEarlyConflictDetection(String partitionPath,
String dataFileName, IOType type, boolean checkIfExists,
+
HoodieEarlyConflictDetectionStrategy resolutionStrategy,
+ Set<HoodieInstant>
completedCommitInstants, HoodieWriteConfig config, String fileId) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ String markerFileName = getMarkerFileName(dataFileName, type);
+ Map<String, String> paramsMap = initConfigMap(partitionPath,
markerFileName);
+
+ paramsMap.put(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL,
config.getMarkerConflictCheckerBatchInterval());
+ paramsMap.put(MARKER_CONFLICT_CHECKER_PERIOD,
config.getMarkerConflictCheckerPeriod());
Review Comment:
nit: group the `MARKER_CONFLICT_CHECKER_*` params together
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieLockConfig.java:
##########
@@ -189,6 +190,33 @@ public class HoodieLockConfig extends HoodieConfig {
.withDocumentation("Lock provider class name, this should be subclass of
"
+ "org.apache.hudi.client.transaction.ConflictResolutionStrategy");
+ // Pluggable strategies to use when early conflict detection
+ public static final ConfigProperty<String>
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.detection.strategy")
+
.defaultValue(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName())
+ .sinceVersion("0.12.0")
+ .withDocumentation("Early conflict detection class name, this should be
subclass of "
+ +
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+ public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE
= ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.detection.enable")
+ .defaultValue(false)
+ .sinceVersion("0.12.0")
+ .withDocumentation("Enable early conflict detection based on markers. It
will try to detect writing conflict before create markers and fast fail"
+ + " which will release cluster resources as soon as possible.");
+
+ public static final ConfigProperty<Long>
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.async.checker.batch.interval")
+ .defaultValue(30000L)
+ .sinceVersion("0.12.0")
+ .withDocumentation("Used for timeline based marker
AsyncTimelineMarkerConflictResolutionStrategy. The time to delay first async
marker conflict checking.");
+
+ public static final ConfigProperty<Long> MARKER_CONFLICT_CHECKER_PERIOD =
ConfigProperty
+ .key(LOCK_PREFIX + "early.conflict.async.checker.period")
+ .defaultValue(30000L)
+ .sinceVersion("0.12.0")
+ .withDocumentation("Used for timeline based marker
AsyncTimelineMarkerConflictResolutionStrategy. The period between each marker
conflict checking.");
Review Comment:
Instead of using the prefix of `hoodie.write.lock.` (`LOCK_PREFIX`), we
should use `hoodie.write.concurrency.` (creating a new constant String for
that) and put these configs into `HoodieWriteConfig`, since the early conflict
detection falls under the concurrency control.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+ private static final Logger LOG =
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+ private MarkerHandler markerHandler;
+ private String markerDir;
+ private String basePath;
+ private FileSystem fs;
+ private AtomicBoolean hasConflict;
+ private long maxAllowableHeartbeatIntervalInMs;
+ private Set<HoodieInstant> oldInstants;
+
+ public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler
markerHandler, String markerDir,
+ String basePath, FileSystem fileSystem, long
maxAllowableHeartbeatIntervalInMs,
+ Set<HoodieInstant> oldInstants) {
+ this.markerHandler = markerHandler;
+ this.markerDir = markerDir;
+ this.basePath = basePath;
+ this.fs = fileSystem;
+ this.hasConflict = hasConflict;
+ this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+ this.oldInstants = oldInstants;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!fs.exists(new Path(markerDir))) {
+ return;
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ Set<String> currentInstantAllMarkers =
markerHandler.getAllMarkers(markerDir);
+ Path tempPath = new Path(basePath + Path.SEPARATOR +
HoodieTableMetaClient.TEMPFOLDER_NAME);
+
+ List<Path> instants = MarkerUtils.getAllMarkerDir(tempPath, fs);
+ List<String> candidate = getCandidateInstants(instants,
markerDirToInstantTime(markerDir));
+ Set<String> tableMarkers = candidate.stream().flatMap(instant -> {
+ return
MarkerUtils.readTimelineServerBasedMarkersFromFileSystemLocally(instant,
fs).stream();
+ }).collect(Collectors.toSet());
+
+ Set<String> currentFileIDs =
currentInstantAllMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet());
+ Set<String> tableFilesIDs =
tableMarkers.stream().map(this::makerToPartitionAndFileID).collect(Collectors.toSet());
+
+ currentFileIDs.retainAll(tableFilesIDs);
+
+ if (!currentFileIDs.isEmpty() ||
hasCommitConflict(currentInstantAllMarkers, basePath)) {
+ LOG.warn("Conflict writing detected based on markers!\n"
+ + "Conflict markers: " + currentInstantAllMarkers + "\n"
+ + "Table markers: " + tableMarkers);
+ hasConflict.compareAndSet(false, true);
+ }
+ LOG.info("Finish batch marker checker in " + timer.endTimer() + " ms");
+
+ } catch (IOException e) {
+ throw new HoodieIOException("IOException occurs during checking marker
conflict");
+ }
+ }
+
+ /**
+ * Get Candidate Instant to do conflict checking:
+ * 1. Skip current writer related instant(currentInstantTime)
+ * 2. Skip all instants after currentInstantTime
+ * 3. Skip dead writers related instants based on heart-beat
+ * @param instants
+ * @return
+ */
+ private List<String> getCandidateInstants(List<Path> instants, String
currentInstantTime) {
Review Comment:
Can we adapt the common logic from `ConflictResolutionStrategy` instead of
reinventing similar logic?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -132,14 +143,54 @@ protected Option<Path> create(String partitionPath,
String dataFileName, IOType
HoodieTimer timer = HoodieTimer.start();
String markerFileName = getMarkerFileName(dataFileName, type);
- Map<String, String> paramsMap = new HashMap<>();
+ Map<String, String> paramsMap = initConfigMap(partitionPath,
markerFileName);
+ boolean success = executeCreateMarkerRequest(paramsMap, partitionPath,
markerFileName);
+ LOG.info("[timeline-server-based] Created marker file " + partitionPath +
"/" + markerFileName
+ + " in " + timer.endTimer() + " ms");
+ if (success) {
+ return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath,
partitionPath), markerFileName));
+ } else {
+ return Option.empty();
+ }
+ }
+
+ @Override
+ public Option<Path> createWithEarlyConflictDetection(String partitionPath,
String dataFileName, IOType type, boolean checkIfExists,
+
HoodieEarlyConflictDetectionStrategy resolutionStrategy,
+ Set<HoodieInstant>
completedCommitInstants, HoodieWriteConfig config, String fileId) {
+ HoodieTimer timer = new HoodieTimer().startTimer();
+ String markerFileName = getMarkerFileName(dataFileName, type);
+ Map<String, String> paramsMap = initConfigMap(partitionPath,
markerFileName);
+
+ paramsMap.put(MARKER_CONFLICT_CHECKER_BATCH_INTERVAL,
config.getMarkerConflictCheckerBatchInterval());
+ paramsMap.put(MARKER_CONFLICT_CHECKER_PERIOD,
config.getMarkerConflictCheckerPeriod());
paramsMap.put(MARKER_DIR_PATH_PARAM, markerDirPath.toString());
Review Comment:
this should be moved to `initConfigMap()` as well.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import org.apache.hadoop.fs.FileSystem;
+import
org.apache.hudi.common.conflict.detection.HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.Set;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there
is any marker file conflict.
+ */
+public class SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy
extends HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy {
+ private static final Logger LOG =
LogManager.getLogger(SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+ @Override
+ public boolean hasMarkerConflict(String basePath, FileSystem fs, String
partitionPath, String fileId, String instantTime,
Review Comment:
The logic here looks like the same as
`SimpleDirectMarkerBasedEarlyConflictDetectionStrategy`. Does it make sense to
move the transaction management inside the strategy to make it easier to
understand?
##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+public abstract class
HoodieTransactionDirectMarkerBasedEarlyConflictDetectionStrategy
Review Comment:
Do we still need this?
##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientMultiWriter.java:
##########
@@ -658,4 +740,56 @@ private JavaRDD<WriteStatus>
startCommitForUpdate(HoodieWriteConfig writeConfig,
assertNoWriteErrors(statuses);
return result;
}
+
+ public static Stream<Arguments> configParams() {
+ Object[][] data =
+ new Object[][] {{"COPY_ON_WRITE",
MarkerType.TIMELINE_SERVER_BASED.name()}, {"MERGE_ON_READ",
MarkerType.TIMELINE_SERVER_BASED.name()},
+ {"MERGE_ON_READ", MarkerType.DIRECT.name()}, {"COPY_ON_WRITE",
MarkerType.DIRECT.name()}};
+ return Stream.of(data).map(Arguments::of);
+ }
+
+ private HoodieWriteConfig buildWriteConfigForEarlyConflictDetect(String
markerType, Properties properties) {
+ if (markerType.equalsIgnoreCase(MarkerType.DIRECT.name())) {
+ return getConfigBuilder()
+ .withHeartbeatIntervalInMs(3600 * 1000)
+ .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+ .withStorageType(FileSystemViewStorageType.MEMORY)
+
.withSecondaryStorageType(FileSystemViewStorageType.MEMORY).build())
+ .withCleanConfig(HoodieCleanConfig.newBuilder()
+
.withFailedWritesCleaningPolicy(HoodieFailedWritesCleaningPolicy.LAZY)
+ .withAutoClean(false).build())
+ .withArchivalConfig(HoodieArchivalConfig.newBuilder()
+ .withAutoArchive(false).build())
+
.withWriteConcurrencyMode(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL)
+ .withMarkersType(MarkerType.DIRECT.name())
+
.withLockConfig(HoodieLockConfig.newBuilder().withLockProvider(InProcessLockProvider.class)
+ .withEarlyConflictDetectionEnable(true)
+
.withEarlyConflictDetectionStrategy(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName())
Review Comment:
For direct markers, should we also test
`SimpleTransactionDirectMarkerBasedEarlyConflictDetectionStrategy`?
##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+public interface HoodieEarlyConflictDetectionStrategy {
Review Comment:
Move common methods here?
##########
hudi-common/src/main/java/org/apache/hudi/common/table/marker/MarkerOperation.java:
##########
@@ -29,6 +29,12 @@ public class MarkerOperation implements Serializable {
public static final String MARKER_DIR_PATH_PARAM = "markerdirpath";
public static final String MARKER_NAME_PARAM = "markername";
+ public static final String MARKER_CONFLICT_CHECKER_BATCH_INTERVAL =
"batchinterval";
+ public static final String MARKER_CONFLICT_CHECKER_PERIOD = "period";
+ public static final String MARKER_CONFLICT_CHECKER_HEART_BEAT_INTERVAL =
"heartbeatinterval";
+ public static final String MARKER_BASEPATH_PARAM = "basepath";
+ public static final String MARKER_CONFLICT_CHECKER_ENABLE =
"HoodieEarlyConflictDetectionStrategy";
+ public static final String MARKER_CONFLICT_CHECKER_STRATEGY =
"earlyconflictdetectionstrategy";
Review Comment:
Let's add `marker` as the prefix for the parameter naming and avoid capital
letters.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
*/
- public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName) {
+ public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName,
+ String batchInterval, String
period, String maxAllowableHeartbeatIntervalInMs,
+ String basePath, String
earlyConflictDetectionEnable,
+ String
earlyConflictDetectionClassName) {
+ // Step1 do early conflict detection if enable
Review Comment:
Should the async conflict detection thread be started in the constructor of
the `MarkerHandler`? It does not make sense to start the checker for every
request. Anyway, the checker is doing the detection in a batching and async
way.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
*/
- public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName) {
+ public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName,
+ String batchInterval, String
period, String maxAllowableHeartbeatIntervalInMs,
+ String basePath, String
earlyConflictDetectionEnable,
+ String
earlyConflictDetectionClassName) {
+ // Step1 do early conflict detection if enable
+ if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+ try {
+ synchronized (earlyConflictDetectionLock) {
+ if (earlyConflictDetectionStrategy == null) {
+ earlyConflictDetectionStrategy =
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+ }
Review Comment:
init this in constructor?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/MarkerHandler.java:
##########
@@ -150,7 +168,50 @@ public boolean doesMarkerDirExist(String markerDir) {
* @param markerName marker name
* @return the {@code CompletableFuture} instance for the request
*/
- public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName) {
+ public CompletableFuture<String> createMarker(Context context, String
markerDir, String markerName,
+ String batchInterval, String
period, String maxAllowableHeartbeatIntervalInMs,
+ String basePath, String
earlyConflictDetectionEnable,
+ String
earlyConflictDetectionClassName) {
+ // Step1 do early conflict detection if enable
+ if (Boolean.parseBoolean(earlyConflictDetectionEnable)) {
+ try {
+ synchronized (earlyConflictDetectionLock) {
+ if (earlyConflictDetectionStrategy == null) {
+ earlyConflictDetectionStrategy =
ReflectionUtils.loadClass(earlyConflictDetectionClassName);
+ }
+
+ if (!markerDir.equalsIgnoreCase(currentMarkerDir)) {
+ this.currentMarkerDir = markerDir;
+ Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION,
DELTA_COMMIT_ACTION, REPLACE_COMMIT_ACTION);
+ Set<HoodieInstant> oldInstants =
viewManager.getFileSystemView(basePath)
+ .getTimeline()
+ .filterCompletedInstants()
+ .filter(instant -> actions.contains(instant.getAction()))
+ .getInstants()
+ .collect(Collectors.toSet());
+
+ earlyConflictDetectionStrategy.fresh(batchInterval, period,
markerDir, basePath, maxAllowableHeartbeatIntervalInMs, fileSystem,
+ this, oldInstants);
+ }
+ }
+
+ if (earlyConflictDetectionStrategy.hasMarkerConflict()) {
+ earlyConflictDetectionStrategy.resolveMarkerConflict(basePath,
markerDir, markerName);
+ }
Review Comment:
Only this should be kept when doing batch marker creation in
`MarkerDirState::processMarkerCreationRequests`.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+ private static final Logger LOG =
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+ private MarkerHandler markerHandler;
+ private String markerDir;
+ private String basePath;
+ private FileSystem fs;
+ private AtomicBoolean hasConflict;
+ private long maxAllowableHeartbeatIntervalInMs;
+ private Set<HoodieInstant> oldInstants;
+
+ public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler
markerHandler, String markerDir,
Review Comment:
Is it easy to add a unit test for this checker?
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
+ private static final Logger LOG =
LogManager.getLogger(MarkerCheckerRunnable.class);
+
+ private MarkerHandler markerHandler;
+ private String markerDir;
+ private String basePath;
+ private FileSystem fs;
+ private AtomicBoolean hasConflict;
+ private long maxAllowableHeartbeatIntervalInMs;
+ private Set<HoodieInstant> oldInstants;
+
+ public MarkerCheckerRunnable(AtomicBoolean hasConflict, MarkerHandler
markerHandler, String markerDir,
+ String basePath, FileSystem fileSystem, long
maxAllowableHeartbeatIntervalInMs,
+ Set<HoodieInstant> oldInstants) {
+ this.markerHandler = markerHandler;
+ this.markerDir = markerDir;
+ this.basePath = basePath;
+ this.fs = fileSystem;
+ this.hasConflict = hasConflict;
+ this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+ this.oldInstants = oldInstants;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (!fs.exists(new Path(markerDir))) {
+ return;
+ }
+
+ HoodieTimer timer = new HoodieTimer().startTimer();
Review Comment:
nit: use `HoodieTimer.start()`;
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/handlers/marker/MarkerCheckerRunnable.java:
##########
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.timeline.service.handlers.marker;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.timeline.service.handlers.MarkerHandler;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class MarkerCheckerRunnable implements Runnable {
Review Comment:
nit: rename to `MarkerBasedEarlyConflictDetectionRunnable`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]