yihua commented on code in PR #6133:
URL: https://github.com/apache/hudi/pull/6133#discussion_r1063209840
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When table is upgraded from pre 0.12 to 0.12, we
check for \"default\" partition and fail if found one. "
+ "Users are expected to rewrite the data in those partitions.
Enabling this config will bypass this validation");
+ // Pluggable strategies to use when early conflict detection
+ public static final ConfigProperty<String>
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+ .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+ .noDefaultValue()
+ .sinceVersion("0.13.0")
+ .withInferFunction(cfg -> {
+ MarkerType markerType =
MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+ switch (markerType) {
+ case DIRECT:
+ return
Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+ case TIMELINE_SERVER_BASED:
+ default:
+ return
Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+ }
+ })
+ .withDocumentation("Early conflict detection class name, this should be
subclass of "
+ +
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+ public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE
= ConfigProperty
+ .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("Enable early conflict detection based on markers. It
will try to detect writing conflict before create markers and fast fail"
+ + " which will release cluster resources as soon as possible.");
+
+ public static final ConfigProperty<Long>
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty
Review Comment:
Add unit for the time configs
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/TimelineServerBasedWriteMarkers.java:
##########
@@ -148,13 +185,30 @@ protected Option<Path> create(String partitionPath,
String dataFileName, IOType
} catch (IOException e) {
throw new HoodieRemoteException("Failed to create marker file " +
partitionPath + "/" + markerFileName, e);
}
- LOG.info("[timeline-server-based] Created marker file " + partitionPath +
"/" + markerFileName
- + " in " + timer.endTimer() + " ms");
- if (success) {
- return Option.of(new Path(FSUtils.getPartitionPath(markerDirPath,
partitionPath), markerFileName));
+ return success;
+ }
+
+ /**
+ * init create marker related config maps.
+ * @param partitionPath
+ * @param markerFileName
+ * @return
+ */
+ private Map<String, String> initConfigMap(String partitionPath, String
markerFileName, boolean initEarlyConflictConfigs) {
Review Comment:
nit: `initEarlyConflictConfigs` -> `initConflictDetectionConfigs`
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java:
##########
@@ -549,6 +553,50 @@ public class HoodieWriteConfig extends HoodieConfig {
.withDocumentation("When table is upgraded from pre 0.12 to 0.12, we
check for \"default\" partition and fail if found one. "
+ "Users are expected to rewrite the data in those partitions.
Enabling this config will bypass this validation");
+ // Pluggable strategies to use when early conflict detection
+ public static final ConfigProperty<String>
EARLY_CONFLICT_DETECTION_STRATEGY_CLASS_NAME = ConfigProperty
+ .key(CONCURRENCY_PREFIX + "early.conflict.detection.strategy")
+ .noDefaultValue()
+ .sinceVersion("0.13.0")
+ .withInferFunction(cfg -> {
+ MarkerType markerType =
MarkerType.valueOf(cfg.getString(MARKERS_TYPE).toUpperCase());
+ switch (markerType) {
+ case DIRECT:
+ return
Option.of(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class.getName());
+ case TIMELINE_SERVER_BASED:
+ default:
+ return
Option.of(AsyncTimelineMarkerEarlyConflictDetectionStrategy.class.getName());
+ }
+ })
+ .withDocumentation("Early conflict detection class name, this should be
subclass of "
+ +
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy");
+
+ public static final ConfigProperty<Boolean> EARLY_CONFLICT_DETECTION_ENABLE
= ConfigProperty
+ .key(CONCURRENCY_PREFIX + "early.conflict.detection.enable")
+ .defaultValue(false)
+ .sinceVersion("0.13.0")
+ .withDocumentation("Enable early conflict detection based on markers. It
will try to detect writing conflict before create markers and fast fail"
+ + " which will release cluster resources as soon as possible.");
+
+ public static final ConfigProperty<Long>
MARKER_CONFLICT_CHECKER_BATCH_INTERVAL = ConfigProperty
Review Comment:
naming: `checker` -> `detector` in all places.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
public class LockManager implements Serializable, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(LockManager.class);
- private final HoodieWriteConfig writeConfig;
- private final LockConfiguration lockConfiguration;
- private final SerializableConfiguration hadoopConf;
- private final int maxRetries;
- private final long maxWaitTimeInMs;
+ private HoodieWriteConfig writeConfig;
+ private LockConfiguration lockConfiguration;
+ private SerializableConfiguration hadoopConf;
+ private int maxRetries;
+ private long maxWaitTimeInMs;
Review Comment:
These should still be final.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,46 @@ public WriteMarkers(String basePath, String
markerFolderPath, String instantTime
this.instantTime = instantTime;
}
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type) {
+ return create(partitionPath, dataFileName, type, Option.empty());
+ }
+
/**
* Creates a marker without checking if the marker already exists.
*
* @param partitionPath partition path in the table
* @param dataFileName data file name
* @param type write IO type
+ * @param handler could be empty
* @return the marker path
*/
- public Option<Path> create(String partitionPath, String dataFileName, IOType
type) {
- return create(partitionPath, dataFileName, type, false);
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type, Option<HoodieWriteHandle> handler) {
+ boolean checkIfExists = false;
+
+ if (handler.isPresent()
+ &&
handler.get().getConfig().getWriteConcurrencyMode().supportsOptimisticConcurrencyControl()
+ && handler.get().getConfig().isEarlyConflictDetectionEnable()) {
+
+ HoodieTableMetaClient metaClient =
handler.get().getHoodieTableMetaClient();
+ HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+
+ HoodieTimeline pendingCompactionTimeline =
activeTimeline.filterPendingCompactionTimeline();
+ HoodieTimeline pendingReplaceTimeline =
activeTimeline.filterPendingReplaceTimeline();
+ // TODO if current is compact or clustering then create marker directly
without early conflict detection.
+ // Need to support early conflict detection between table service and
common writers.
+ if (pendingCompactionTimeline.containsInstant(instantTime) ||
pendingReplaceTimeline.containsInstant(instantTime)) {
Review Comment:
Sg. @zhangyue19921010 could you add the details around this implementation
decision to the RFC?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/embedded/EmbeddedTimelineService.java:
##########
@@ -86,7 +86,17 @@ public void startServer() throws IOException {
.enableMarkerRequests(true)
.markerBatchNumThreads(writeConfig.getMarkersTimelineServerBasedBatchNumThreads())
.markerBatchIntervalMs(writeConfig.getMarkersTimelineServerBasedBatchIntervalMs())
- .markerParallelism(writeConfig.getMarkersDeleteParallelism());
+ .markerParallelism(writeConfig.getMarkersDeleteParallelism())
+ ;
Review Comment:
nit: should be in the same line.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/DirectWriteMarkers.java:
##########
@@ -155,6 +160,20 @@ protected Option<Path> create(String partitionPath, String
dataFileName, IOType
return create(getMarkerPath(partitionPath, dataFileName, type),
checkIfExists);
}
+ @Override
+ public Option<Path> createWithEarlyConflictDetection(String partitionPath,
String dataFileName, IOType type, boolean checkIfExists, Set<HoodieInstant>
completedCommitInstants,
Review Comment:
Could `completedCommitInstants` be derived from `activeTimeline`? If so,
there is no need to pass in `completedCommitInstants`.
##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieTimelineServerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+
+import java.util.Set;
+
+public abstract class HoodieTimelineServerBasedEarlyConflictDetectionStrategy
implements HoodieEarlyConflictDetectionStrategy {
Review Comment:
Implementation-wise, `HoodieEarlyConflictDetectionStrategy` is used by the
write client. I suggest that the timeline-server-related logic, i.e., the
`fresh` method containing the scheduling of the marker checker, be moved to the
timeline server.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkers.java:
##########
@@ -52,16 +58,47 @@ public WriteMarkers(String basePath, String
markerFolderPath, String instantTime
this.instantTime = instantTime;
}
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type) {
+ return create(partitionPath, dataFileName, type, Option.empty());
+ }
+
/**
* Creates a marker without checking if the marker already exists.
*
* @param partitionPath partition path in the table
* @param dataFileName data file name
* @param type write IO type
+ * @param handler could be empty
* @return the marker path
*/
- public Option<Path> create(String partitionPath, String dataFileName, IOType
type) {
- return create(partitionPath, dataFileName, type, false);
+ public Option<Path> create(String partitionPath, String dataFileName, IOType
type, Option<HoodieWriteHandle> handler) {
Review Comment:
nit: `HoodieWriteHandle` instance should not be directly passed in here.
Instead, only pass in what's needed, i.e., relevant write config, the timeline,
and the file ID, to avoid exposure of `HoodieWriteHandle`.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -62,6 +63,33 @@ public LockManager(HoodieWriteConfig writeConfig, FileSystem
fs) {
metrics = new HoodieLockMetrics(writeConfig);
}
+ /**
+ * Try to have a lock at partitionPath + fileID level for different write
handler.
+ * @param writeConfig
+ * @param fs
+ * @param partitionPath
+ * @param fileId
+ */
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String
partitionPath, String fileId) {
+ this.writeConfig = writeConfig;
+ this.hadoopConf = new SerializableConfiguration(fs.getConf());
+ TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/"
+ fileId);
+ this.lockConfiguration = new LockConfiguration(props);
+ maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
+ maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
+ }
+
+ /**
+ * rebuild lock related configs, only support ZK related lock for now.
+ */
+ private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig,
String key) {
+ TypedProperties props = new TypedProperties(writeConfig.getProps());
+ props.setProperty(LockConfiguration.ZK_LOCK_KEY_PROP_KEY, key);
Review Comment:
Sg. Let's use `LOCK_PROVIDER_CLASS_NAME` instead of `ZK_BASE_PATH_PROP_KEY`
for checking whether ZK-based lock is configured.
@zhangyue19921010 could you file a JIRA ticket besides the TODO because this
requires more work?
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/TransactionManager.java:
##########
@@ -57,6 +62,14 @@ public void beginTransaction(Option<HoodieInstant>
newTxnOwnerInstant,
}
}
+ public void beginTransaction(String partitionPath, String fileId) {
Review Comment:
This can be simplified without taking the arguments because the constructor
takes the partition path and file ID. The transaction manager for the direct
makers should be implemented in a separate class to isolate the logic.
##########
hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java:
##########
@@ -124,6 +124,29 @@ public static class Config implements Serializable {
@Parameter(names = {"--marker-parallelism", "-mdp"}, description =
"Parallelism to use for reading and deleting marker files")
public int markerParallelism = 100;
+ @Parameter(names = {"--early-conflict-detection-strategy"}, description =
"Early conflict detection class name, this should be subclass of "
+ +
"org.apache.hudi.common.conflict.detection.HoodieEarlyConflictDetectionStrategy")
+ public String earlyConflictDetectStrategy =
"org.apache.hudi.timeline.service.handlers.marker.AsyncTimelineMarkerEarlyConflictDetectionStrategy";
+
+ @Parameter(names = {"--early-conflict-detection-check-commit-conflict"},
description = "Enable check commit conflict or not during early conflict
detect")
+ public Boolean checkCommitConflict = false;
+
+ @Parameter(names = {"--early-conflict-detection-enable"}, description =
"Enable early conflict detection based on markers. It will try to detect
writing conflict "
+ + "before create markers and fast fail which will release cluster
resources as soon as possible.")
+ public Boolean earlyConflictDetectionEnable = false;
+
+ @Parameter(names = {"--early-conflict-async-checker-batch-interval"},
description = "Used for timeline based marker
AsyncTimelineMarkerConflictResolutionStrategy. "
Review Comment:
nit: rename the timeline server configs to be aligned with
`HoodieWriteConfig`
##########
hudi-common/src/main/java/org/apache/hudi/common/conflict/detection/HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.conflict.detection;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.common.config.HoodieConfig;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public abstract class HoodieDirectMarkerBasedEarlyConflictDetectionStrategy
implements HoodieEarlyConflictDetectionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(HoodieDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+ protected final String basePath;
+ protected final FileSystem fs;
+ protected final String partitionPath;
+ protected final String fileId;
+ protected final String instantTime;
+ protected final HoodieActiveTimeline activeTimeline;
+ protected final HoodieConfig config;
+ protected Set<HoodieInstant> completedCommitInstants;
+ protected final Boolean checkCommitConflict;
+ protected final Long maxAllowableHeartbeatIntervalInMs;
+
+ public HoodieDirectMarkerBasedEarlyConflictDetectionStrategy(String
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId,
String instantTime,
+
HoodieActiveTimeline activeTimeline, HoodieConfig config, Boolean
checkCommitConflict, Long maxAllowableHeartbeatIntervalInMs,
+
HashSet<HoodieInstant> completedCommitInstants) {
+ this.basePath = basePath;
+ this.fs = fs;
+ this.partitionPath = partitionPath;
+ this.fileId = fileId;
+ this.instantTime = instantTime;
+ this.completedCommitInstants = completedCommitInstants;
+ this.activeTimeline = activeTimeline;
+ this.config = config;
+ this.checkCommitConflict = checkCommitConflict;
+ this.maxAllowableHeartbeatIntervalInMs = maxAllowableHeartbeatIntervalInMs;
+ }
+
+ /**
+ * We need to do list operation here.
+ * In order to reduce the list pressure as much as possible, first we build
path prefix in advance: '$base_path/.temp/instant_time/partition_path',
+ * and only list these specific partition_paths we need instead of list all
the '$base_path/.temp/'
+ * @param basePath
+ * @param partitionPath
+ * @param fileId 162b13d7-9530-48cf-88a4-02241817ae0c-0_1-74-100_003.parquet
+ * @return true if current fileID is already existed under
.temp/instant_time/partition_path/..
+ * @throws IOException
+ */
+ public boolean checkMarkerConflict(HoodieActiveTimeline activeTimeline,
String basePath, String partitionPath, String fileId,
Review Comment:
Similarly, the arguments should be simplified since the constructor takes
most of the information already.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/lock/LockManager.java:
##########
@@ -43,23 +47,52 @@
public class LockManager implements Serializable, AutoCloseable {
private static final Logger LOG = LogManager.getLogger(LockManager.class);
- private final HoodieWriteConfig writeConfig;
- private final LockConfiguration lockConfiguration;
- private final SerializableConfiguration hadoopConf;
- private final int maxRetries;
- private final long maxWaitTimeInMs;
+ private HoodieWriteConfig writeConfig;
+ private LockConfiguration lockConfiguration;
+ private SerializableConfiguration hadoopConf;
+ private int maxRetries;
+ private long maxWaitTimeInMs;
private transient HoodieLockMetrics metrics;
private volatile LockProvider lockProvider;
public LockManager(HoodieWriteConfig writeConfig, FileSystem fs) {
+ init(writeConfig, fs.getConf(), writeConfig.getProps());
+ }
+
+ /**
+ * Try to have a lock at partitionPath + fileID level for different write
handler.
+ * @param writeConfig
+ * @param fs
+ * @param partitionPath
+ * @param fileId
+ */
+ public LockManager(HoodieWriteConfig writeConfig, FileSystem fs, String
partitionPath, String fileId) {
+ TypedProperties props = refreshLockConfig(writeConfig, partitionPath + "/"
+ fileId);
+ init(writeConfig, fs.getConf(), props);
+ }
+
+ private void init(HoodieWriteConfig writeConfig, Configuration conf,
TypedProperties lockProps) {
+ this.lockConfiguration = new LockConfiguration(lockProps);
this.writeConfig = writeConfig;
- this.hadoopConf = new SerializableConfiguration(fs.getConf());
- this.lockConfiguration = new LockConfiguration(writeConfig.getProps());
- maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
+ this.hadoopConf = new SerializableConfiguration(conf);
+ this.maxRetries =
lockConfiguration.getConfig().getInteger(LOCK_ACQUIRE_CLIENT_NUM_RETRIES_PROP_KEY,
Integer.parseInt(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_NUM_RETRIES.defaultValue()));
- maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
+ this.maxWaitTimeInMs =
lockConfiguration.getConfig().getLong(LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS_PROP_KEY,
Long.parseLong(HoodieLockConfig.LOCK_ACQUIRE_CLIENT_RETRY_WAIT_TIME_IN_MILLIS.defaultValue()));
- metrics = new HoodieLockMetrics(writeConfig);
+ this.metrics = new HoodieLockMetrics(writeConfig);
+ }
+
+ /**
+ * rebuild lock related configs, only support ZK related lock for now.
+ */
+ private TypedProperties refreshLockConfig(HoodieWriteConfig writeConfig,
String key) {
Review Comment:
It is better to move this logic of changing lock configs to the transaction
manager for the direct makers, as the lock manager itself should not be aware
of any higher-layer logic.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import
org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there
is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends
HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+ public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId,
String instantTime,
Review Comment:
nit: the full write config is not needed. We should simply the list of
arguments passed in here.
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.marker;
+
+import
org.apache.hudi.common.conflict.detection.HoodieDirectMarkerBasedEarlyConflictDetectionStrategy;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.util.MarkerUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieEarlyConflictDetectionException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ConcurrentModificationException;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * This strategy is used for direct marker writers, trying to do early
conflict detection.
+ * It will use fileSystem api like list and exist directly to check if there
is any marker file conflict.
+ */
+public class SimpleDirectMarkerBasedEarlyConflictDetectionStrategy extends
HoodieDirectMarkerBasedEarlyConflictDetectionStrategy {
+
+ private static final Logger LOG =
LogManager.getLogger(SimpleDirectMarkerBasedEarlyConflictDetectionStrategy.class);
+
+ public SimpleDirectMarkerBasedEarlyConflictDetectionStrategy(String
basePath, HoodieWrapperFileSystem fs, String partitionPath, String fileId,
String instantTime,
+
HoodieActiveTimeline activeTimeline, HoodieWriteConfig config, Boolean
checkCommitConflict) {
+ super(basePath, fs, partitionPath, fileId, instantTime, activeTimeline,
config, checkCommitConflict);
+ }
+
+ @Override
+ public boolean hasMarkerConflict() {
+ try {
+ return checkMarkerConflict(basePath, partitionPath, fileId, fs,
instantTime)
+ || (checkCommitConflict &&
MarkerUtils.hasCommitConflict(Stream.of(fileId).collect(Collectors.toSet()),
basePath, completedCommitInstants));
+ } catch (IOException e) {
+ LOG.warn("Exception occurs during create marker file in eager conflict
detection mode.");
+ throw new HoodieIOException("Exception occurs during create marker file
in eager conflict detection mode.", e);
+ }
+ }
+
+ @Override
+ public void resolveMarkerConflict(String basePath, String partitionPath,
String dataFileName) {
+ throw new HoodieEarlyConflictDetectionException(new
ConcurrentModificationException("Early conflict detected but cannot resolve
conflicts for overlapping writes"));
Review Comment:
As @zhangyue19921010 explained, this is unnecessary because compaction and
clustering instants do not go through the early conflict detection logic.
##########
hudi-common/src/main/java/org/apache/hudi/common/util/MarkerUtils.java:
##########
@@ -208,10 +219,116 @@ public static Set<String> readMarkersFromFile(Path
markersFilePath, Serializable
fsDataInputStream = fs.open(markersFilePath);
markers = new
HashSet<>(FileIOUtils.readAsUTFStringLines(fsDataInputStream));
} catch (IOException e) {
- throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ if (ignoreException) {
+ LOG.warn("IOException occurs during read MARKERS file, ", e);
+ } else {
+ throw new HoodieIOException("Failed to read MARKERS file " +
markersFilePath, e);
+ }
} finally {
closeQuietly(fsDataInputStream);
}
return markers;
}
+
+ public static List<Path> getAllMarkerDir(Path tempPath, FileSystem fs)
throws IOException {
+ return
Arrays.stream(fs.listStatus(tempPath)).map(FileStatus::getPath).collect(Collectors.toList());
+ }
+
+ public static boolean hasCommitConflict(HoodieActiveTimeline activeTimeline,
Set<String> currentFileIDs, Set<HoodieInstant> completedCommitInstants) {
+
+ Set<HoodieInstant> currentInstants =
activeTimeline.reload().getCommitsTimeline().filterCompletedInstants().getInstants().collect(Collectors.toSet());
+
+ currentInstants.removeAll(completedCommitInstants);
+ Set<String> missingFileIDs = currentInstants.stream().flatMap(instant -> {
+ try {
+ return
HoodieCommitMetadata.fromBytes(activeTimeline.getInstantDetails(instant).get(),
HoodieCommitMetadata.class)
+ .getFileIdAndRelativePaths().keySet().stream();
+ } catch (Exception e) {
+ return Stream.empty();
+ }
+ }).collect(Collectors.toSet());
+ currentFileIDs.retainAll(missingFileIDs);
+ return !currentFileIDs.isEmpty();
+ }
+
+ /**
+ * Get Candidate Instant to do conflict checking:
+ * 1. Skip current writer related instant(currentInstantTime)
+ * 2. Skip all instants after currentInstantTime
+ * 3. Skip dead writers related instants based on heart-beat
+ * 4. Skip pending compaction instant (For now we don' do early conflict
check with compact action)
+ * Because we don't want to let pending compaction block common writer.
+ * @param instants
+ * @return
+ */
+ public static List<String> getCandidateInstants(HoodieActiveTimeline
activeTimeline, List<Path> instants, String currentInstantTime,
+ long
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) {
+
+ HoodieActiveTimeline reloadActive = activeTimeline.reload();
+
+ return instants.stream().map(Path::toString).filter(instantPath -> {
+ String instantTime = markerDirToInstantTime(instantPath);
+ return instantTime.compareToIgnoreCase(currentInstantTime) < 0
+ &&
!reloadActive.filterPendingCompactionTimeline().containsInstant(instantTime)
+ &&
!reloadActive.filterPendingReplaceTimeline().containsInstant(instantTime);
+ }).filter(instantPath -> {
+ try {
+ return !isHeartbeatExpired(markerDirToInstantTime(instantPath),
maxAllowableHeartbeatIntervalInMs, fs, basePath);
+ } catch (IOException e) {
+ return false;
+ }
+ }).collect(Collectors.toList());
+ }
+
+ /**
+ * Get fileID from full marker path, for example:
+ *
20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0_85-15-1390_20220620181735781.parquet.marker.MERGE
+ * ==> get 20210623/0/20210825/932a86d9-5c1d-44c7-ac99-cb88b8ef8478-0
+ * @param marker
+ * @return
+ */
+ public static String makerToPartitionAndFileID(String marker) {
+ String[] ele = marker.split("_");
+ return ele[0];
+ }
+
+ /**
+ * Get instantTime from full marker path, for example:
+ *
/var/folders/t3/th1dw75d0yz2x2k2qt6ys9zh0000gp/T/junit6502909693741900820/dataset/.hoodie/.temp/003
+ * ==> 003
+ * @param marker
+ * @return
+ */
+ public static String markerDirToInstantTime(String marker) {
+ String[] ele = marker.split("/");
+ return ele[ele.length - 1];
+ }
+
+ /**
+ * Use modification time as last heart beat time
+ * @param fs
+ * @param basePath
+ * @param instantTime
+ * @return
+ * @throws IOException
+ */
+ public static Long getLastHeartbeatTime(FileSystem fs, String basePath,
String instantTime) throws IOException {
+ Path heartbeatFilePath = new
Path(HoodieTableMetaClient.getHeartbeatFolderPath(basePath) + Path.SEPARATOR +
instantTime);
+ if (fs.exists(heartbeatFilePath)) {
+ return fs.getFileStatus(heartbeatFilePath).getModificationTime();
+ } else {
+ // NOTE : This can happen when a writer is upgraded to use lazy cleaning
and the last write had failed
+ return 0L;
+ }
+ }
+
+ public static boolean isHeartbeatExpired(String instantTime, long
maxAllowableHeartbeatIntervalInMs, FileSystem fs, String basePath) throws
IOException {
+ Long currentTime = System.currentTimeMillis();
+ Long lastHeartbeatTime = getLastHeartbeatTime(fs, basePath, instantTime);
+ if (currentTime - lastHeartbeatTime > maxAllowableHeartbeatIntervalInMs) {
+ LOG.warn("Heartbeat expired, for instant: " + instantTime);
+ return true;
+ }
+ return false;
+ }
Review Comment:
Given the limitation, now it makes sense to move the `HeartbeatUtils` to
`hudi-common`, so common utils can be shared instead of code duplication.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]