kbuci commented on code in PR #10965:
URL: https://github.com/apache/hudi/pull/10965#discussion_r1556395911
##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java:
##########
@@ -1135,8 +1138,34 @@ protected void
completeLogCompaction(HoodieCommitMetadata metadata, HoodieTable
*/
protected HoodieWriteMetadata<O> compact(String compactionInstantTime,
boolean shouldComplete) {
HoodieTable table = createTable(config, context.getHadoopConf().get());
+ Option<HoodieInstant> instantToCompactOption =
Option.fromJavaOptional(table.getActiveTimeline()
+ .filterCompletedAndCompactionInstants()
+ .getInstants()
+ .stream()
+ .filter(instant ->
HoodieActiveTimeline.EQUALS.test(instant.getTimestamp(), compactionInstantTime))
+ .findFirst());
+ try {
+ // Transaction serves to ensure only one compact job for this instant
will start heartbeat, and any other concurrent
+ // compact job will abort if they attempt to execute compact before
heartbeat expires
+ // Note that as long as all jobs for this table use this API for
compact, then this alone should prevent
+ // compact rollbacks from running concurrently to compact commits.
+ txnManager.beginTransaction(instantToCompactOption,
txnManager.getLastCompletedTransactionOwner());
+ try {
+ if (!this.heartbeatClient.isHeartbeatExpired(compactionInstantTime)) {
+ throw new HoodieLockException("Cannot compact instant " +
compactionInstantTime + " due to heartbeat by existing job");
+ }
+ } catch (IOException e) {
+ throw new HoodieHeartbeatException("Error accessing heartbeat of
instant to compact " + compactionInstantTime, e);
+ }
+ this.heartbeatClient.start(compactionInstantTime);
+ } finally {
+ txnManager.endTransaction(txnManager.getCurrentTransactionOwner());
+ }
preWrite(compactionInstantTime, WriteOperationType.COMPACT,
table.getMetaClient());
- return tableServiceClient.compact(compactionInstantTime, shouldComplete);
+ HoodieWriteMetadata compactMetadata =
tableServiceClient.compact(compactionInstantTime, shouldComplete);
+ this.heartbeatClient.stop(compactionInstantTime, true);
Review Comment:
I was looking into a UT failure in
`org.apache.hudi.table.functional.TestHoodieSparkMergeOnReadTableInsertUpdateDelete#testRepeatedRollbackOfCompaction`
where two compact executions of the same instant time are called back to back
(my understanding is that this is supposed to verify that the second compact
does a no-op and succeeds upon seeing that plan is already committed).
I realized that with this change, the second compact call was failing due to
calling `isHeartbeatExpired` and seeing an active heartbeat (from the first
attempt) still running, despite the fact that here we are stopping the
heartbeat after a successfully completing the compact. The reason that
`isHeartbeatExpired` was unexpectedly `false` here is that
1. `isHeartbeatExpired` will return false if instant time is too recent,
even if the heartbeat has been stopped (in the in-memory mapping)
2. When
`org.apache.hudi.client.heartbeat.HoodieHeartbeatClient#stop(java.lang.String)`
is called (by the first compact call in UT) the heartbeat file is deleted and
the heartbeat in in-memory mapping is stopped (as expected). But this means
that the heartbeat cannot be started again (even if (1) is resolved), since
heartbeat API doesn't allow caller to start a heartbeat that is present in
in-memory mapping and has heartbeatStopped flag set to true.
In order to get around this issue, I added another API in heartbeat API
similar to stop, except that it removes the desired heartbeat from the
in-memory mapping (forcing any future compact call in the same job to re-read
the heartbeat files from DFS and create a new heartbeat in the in-memory
mapping ). Though not sure if there might be a better approach here. I assume
this existing functionality isn't a bug, as it makes sense for commits that
cannot be repeatedly re-executed (like ingestion COMMITs), and I assume the
issue here stems from the fact that for compact we need to potentially
repeatedly restart stopped heartbeats
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]