Zouxxyy commented on code in PR #9233:
URL: https://github.com/apache/hudi/pull/9233#discussion_r1271265718
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/transaction/SimpleConcurrentFileWritesConflictResolutionStrategy.java:
##########
@@ -18,65 +18,144 @@
package org.apache.hudi.client.transaction;
-import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.ClusteringUtils;
import org.apache.hudi.common.util.CollectionUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieWriteConflictException;
-import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.bucket.BucketIdentifier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ConcurrentModificationException;
import java.util.HashSet;
import java.util.Set;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
+import static org.apache.hudi.common.model.WriteOperationType.CLUSTER;
+import static org.apache.hudi.common.model.WriteOperationType.LOG_COMPACT;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMMIT_ACTION;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.COMPACTION_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.DELTA_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.GREATER_THAN;
import static
org.apache.hudi.common.table.timeline.HoodieTimeline.REPLACE_COMMIT_ACTION;
+import static
org.apache.hudi.common.table.timeline.HoodieTimeline.compareTimestamps;
/**
- * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}.
+ * This class is a basic implementation of a conflict resolution strategy for
concurrent writes {@link ConflictResolutionStrategy}
+ * based on start_timestamp, and need to record the pending instants before
write.
*/
public class SimpleConcurrentFileWritesConflictResolutionStrategy
implements ConflictResolutionStrategy {
private static final Logger LOG =
LoggerFactory.getLogger(SimpleConcurrentFileWritesConflictResolutionStrategy.class);
+ private final HoodieWriteConfig writeConfig;
+
+ public
SimpleConcurrentFileWritesConflictResolutionStrategy(HoodieWriteConfig
writeConfig) {
+ this.writeConfig = writeConfig;
+ }
+
@Override
public Stream<HoodieInstant> getCandidateInstants(HoodieTableMetaClient
metaClient, HoodieInstant currentInstant,
- Option<HoodieInstant>
lastSuccessfulInstant) {
- HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
- // To find which instants are conflicting, we apply the following logic
- // 1. Get completed instants timeline only for commits that have happened
since the last successful write.
- // 2. Get any scheduled or completed compaction or clustering operations
that have started and/or finished
- // after the current instant. We need to check for write conflicts since
they may have mutated the same files
- // that are being newly created by the current write.
- Stream<HoodieInstant> completedCommitsInstantStream = activeTimeline
- .getCommitsTimeline()
+ Option<Set<String>>
pendingInstantsBeforeWritten) {
+ // Divide the process of obtaining candidate instants into two steps:
+ // 1) get completed instants after current instant starts
+ // 2) pick from instants that are still pending
+ return Stream.concat(getCompletedInstantsAfterCurrent(metaClient,
currentInstant, pendingInstantsBeforeWritten),
+ pickFromCurrentPendingInstants(metaClient, currentInstant));
+ }
+
+ /**
+ * Get completed instants after current instant starts based on
start_timestamp:
+ * 1) pick start_timestamp > current start_timestamp && have completed
instants
+ * 2) pending instants that were recorded before write, and have been
converted to completed
+ *
+ * @param metaClient meta client
+ * @param currentInstant current instant
+ * @param pendingInstantsBeforeWrite pending instant recorded before write
+ * @return selected {@link HoodieInstant} stream
+ */
+ public Stream<HoodieInstant>
getCompletedInstantsAfterCurrent(HoodieTableMetaClient metaClient,
HoodieInstant currentInstant,
+
Option<Set<String>> pendingInstantsBeforeWrite) {
+ return metaClient.getActiveTimeline()
+ .getTimelineOfActions(CollectionUtils.createSet(COMMIT_ACTION,
REPLACE_COMMIT_ACTION, COMPACTION_ACTION, DELTA_COMMIT_ACTION))
.filterCompletedInstants()
- .findInstantsAfter(lastSuccessfulInstant.isPresent() ?
lastSuccessfulInstant.get().getTimestamp() : HoodieTimeline.INIT_INSTANT_TS)
+ .filter(i -> compareTimestamps(i.getTimestamp(), GREATER_THAN,
currentInstant.getTimestamp())
+ || (pendingInstantsBeforeWrite.isPresent() &&
pendingInstantsBeforeWrite.get().contains(i.getTimestamp())))
.getInstantsAsStream();
+ }
+
+ /**
+ * Pick from instants that are still pending, need to compare the priority
of the two operations.
+ *
+ * @param metaClient meta client
+ * @param currentInstant current instant
+ * @return selected {@link HoodieInstant} stream
+ */
+ public Stream<HoodieInstant>
pickFromCurrentPendingInstants(HoodieTableMetaClient metaClient, HoodieInstant
currentInstant) {
+
+ // Whether to pick the pending instant to candidate instants:
+ //
+-----------------+----------------------------------------+---------------------------------------+------------+
+ // | current\pending | ingestion |
clustering | compaction |
+ //
|-----------------+----------------------------------------+---------------------------------------+------------+
+ // | ingestion | no | no if
#isIngestionPrimaryClustering() | yes |
+ //
|-----------------+----------------------------------------+---------------------------------------+------------+
+ // | clustering | yes if #isIngestionPrimaryClustering() | no
| no |
+ //
+-----------------+----------------------------------------+---------------------------------------+------------+
+
+ Set<String> actionsToPick = new HashSet<>();
+
+ if (REPLACE_COMMIT_ACTION.equals(currentInstant.getAction())
+ && ClusteringUtils.isPendingClusteringInstant(metaClient,
currentInstant)) {
+ if (isIngestionPrimaryClustering()) {
+ actionsToPick.add(COMMIT_ACTION);
+ actionsToPick.add(DELTA_COMMIT_ACTION);
+ }
+ } else {
+ if (!isIngestionPrimaryClustering()) {
+ actionsToPick.add(REPLACE_COMMIT_ACTION);
+ }
+ actionsToPick.add(COMPACTION_ACTION);
Review Comment:
Here I just keep the current implementation, because log compaction has pre
commit now, hope to get more opinions @suryaprasanna
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]