kbuci commented on code in PR #10965:
URL: https://github.com/apache/hudi/pull/10965#discussion_r1556157237
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
*/
protected HoodieWriteMetadata<O> compact(String compactionInstantTime,
boolean shouldComplete) {
HoodieTable table = createTable(config, context.getHadoopConf().get());
+ Option<HoodieInstant> instantToCompactOption =
Option.fromJavaOptional(table.getActiveTimeline()
+ .filterCompletedAndCompactionInstants()
+ .getInstants()
+ .stream()
+ .filter(instant ->
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+ .findFirst());
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for
compact, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginTransaction(instantToCompactOption,
txnManager.getLastCompletedTransactionOwner());
Review Comment:
> 1.2) if the heartbeat does not expire, just can the execution of this run
and log a wanning log there.
Just to clarify, do you mean throwing an exception in this run? Since I'm
not sure if we can make the current run a no-op if a concurrent heartbeat is
detected, since
a) I'm not sure what HoodieWriteMetadata value to return if we make this a
no-op
b) If we don't explicitly throw an exception and fail, then the caller will
assume compaction happened succesfully or already happened. This wouldn't be a
correct assumption, since the other concurrent writer (that is currently
executing this compact plan) may either fail or take a long time to finish.
> if the state if still REQUESTED, we can execute it direcly?
Ah that's a good point, it might actually be safe for two jobs to execute a
compact plan at same time as long as neither of them are doing a roll back.
Despite that though, I don't think it's safe to skip acquiring+starting
heartbeat even if the compact plan only has .requested , since the following
(unlikely) scenario can happen still:
1. Table has a compact plan C.requested created in timeline
2. Job (A) calls compact on C. It starts a heartbeat of C and then starts
executing C
3. Job (B) calls compact on C. Although it sees a heartbeat for C, since C
has no C.inflight it starts executing C
4. Job (A) and/or Job (B) create a C.inflight
5. Job (A) fails.
6. Heartbeat that Job (A) created expires
7. Job (C) calls compact on C. It sees that there is a C.inflight and no
heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts
executing C, and rolls back the existing C.inflight
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,36 @@ protected void
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
*/
protected HoodieWriteMetadata<O> compact(String compactionInstantTime,
boolean shouldComplete) {
HoodieTable table = createTable(config, context.getHadoopConf().get());
+ Option<HoodieInstant> instantToCompactOption =
Option.fromJavaOptional(table.getActiveTimeline()
+ .filterCompletedAndCompactionInstants()
+ .getInstants()
+ .stream()
+ .filter(instant ->
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+ .findFirst());
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for
compact, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginTransaction(instantToCompactOption,
txnManager.getLastCompletedTransactionOwner());
Review Comment:
> 1.2) if the heartbeat does not expire, just can the execution of this run
and log a wanning log there.
Just to clarify, do you mean throwing an exception in this run? Since I'm
not sure if we can make the current run a no-op if a concurrent heartbeat is
detected, since
a) I'm not sure what HoodieWriteMetadata value to return if we make this a
no-op
b) If we don't explicitly throw an exception and fail, then the caller will
assume compaction happened succesfully or already happened. This wouldn't be a
correct assumption, since the other concurrent writer (that is currently
executing this compact plan) may either fail or take a long time to finish.
> if the state if still REQUESTED, we can execute it direcly?
Ah that's a good point, it might actually be safe for two jobs to execute a
compact plan at same time as long as neither of them are doing a roll back.
Despite that though, I don't think it's safe to skip acquiring+starting
heartbeat even if the compact plan only has .requested , since the following
(unlikely) scenario can happen still:
1. Table has a compact plan C.requested created in timeline
2. Job (A) calls compact on C. It starts a heartbeat of C and then starts
executing C
3. Job (B) calls compact on C. Although it sees a heartbeat for C, since C
has no C.inflight it starts executing C
4. Job (A) and/or Job (B) create a C.inflight
5. Job (A) fails.
6. Heartbeat that Job (A) created expires
7. Job (C) calls compact on C. It sees that there is a C.inflight and no
heartbeat (because Job (B) did not start any heartbeat). Therefore, it starts
executing C, and rolls back the existing C.inflight
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]