ksmou commented on code in PR #9229:
URL: https://github.com/apache/hudi/pull/9229#discussion_r1279049899
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieCompactor.java:
##########
@@ -265,11 +267,36 @@ private int doCompact(JavaSparkContext jsc) throws
Exception {
cfg.compactionInstantTime =
firstCompactionInstant.get().getTimestamp();
LOG.info("Found the earliest scheduled compaction instant which will
be executed: "
+ cfg.compactionInstantTime);
- } else {
- LOG.info("There is no scheduled compaction in the table.");
- return 0;
+ }
+
+ // update cfg.compactionInstantTime if finding an expired instant
+ List<HoodieInstant> inflightInstants = metaClient.getActiveTimeline()
+ .filterPendingCompactionTimeline().filterInflights().getInstants();
+ List<String> expiredInstants =
inflightInstants.stream().filter(instant -> {
+ try {
+ return
client.getHeartbeatClient().isHeartbeatExpired(instant.getTimestamp());
+ } catch (IOException io) {
+ LOG.info("Failed to check heartbeat for instant " + instant);
+ }
+ return false;
+ }).map(HoodieInstant::getTimestamp).collect(Collectors.toList());
+
+ if (!expiredInstants.isEmpty()) {
+ cfg.compactionInstantTime = expiredInstants.get(0);
+ LOG.info("Found expired compaction instant, update the earliest
scheduled compaction instant which will be executed: "
+ + cfg.compactionInstantTime);
}
}
+
+ // do nothing if cfg.compactionInstantTime still is null
+ if (StringUtils.isNullOrEmpty(cfg.compactionInstantTime)) {
+ LOG.info("There is no scheduled compaction in the table.");
+ return 0;
+ }
+
+ // start a heartbeat for the instant
+ client.getHeartbeatClient().start(cfg.compactionInstantTime);
+
Review Comment:
> I mean the original try finally block close the write client after the
whole compaction/clean finishes, the new try and close seem does the same thing
?
You are right. I overlooked this knowledge point that the
SparkRDDWriteClient implements AutoCloseable. It will close the write client
automatically and there is no need to add finally block. But the heartbeat
client will also be closed if the write client closed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]