Davis-Zhang-Onehouse commented on code in PR #19042:
URL: https://github.com/apache/hudi/pull/19042#discussion_r3454837509


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -219,41 +220,41 @@ private void inlineCompaction(HoodieTable table, 
Option<Map<String, String>> ext
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime, 
boolean shouldComplete) {
-    HoodieTable<?, I, ?, T> table = createTable(config, 
context.getStorageConf());
-
-    // Check if a commit or compaction instant with a greater timestamp is on 
the timeline.
-    // If an instant is found then abort log compaction, since it is no longer 
needed.
-    Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION);
-    Option<HoodieInstant> compactionInstantWithGreaterTimestamp =
-        Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream()
-            .filter(hoodieInstant -> 
actions.contains(hoodieInstant.getAction()))
-            .filter(hoodieInstant -> 
compareTimestamps(hoodieInstant.requestedTime(),
-                GREATER_THAN, logCompactionInstantTime))
-            .findFirst());
-    if (compactionInstantWithGreaterTimestamp.isPresent()) {
-      throw new HoodieLogCompactException(String.format("Cannot log compact 
since a compaction instant with greater "
-          + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
-    }
+    try (HoodieTable<?, I, ?, T> table = createTable(config, 
context.getStorageConf())) {
+      // Check if a commit or compaction instant with a greater timestamp is 
on the timeline.
+      // If an instant is found then abort log compaction, since it is no 
longer needed.
+      Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION);
+      Option<HoodieInstant> compactionInstantWithGreaterTimestamp =
+          
Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream()
+              .filter(hoodieInstant -> 
actions.contains(hoodieInstant.getAction()))
+              .filter(hoodieInstant -> 
compareTimestamps(hoodieInstant.requestedTime(),
+                  GREATER_THAN, logCompactionInstantTime))
+              .findFirst());
+      if (compactionInstantWithGreaterTimestamp.isPresent()) {
+        throw new HoodieLogCompactException(String.format("Cannot log compact 
since a compaction instant with greater "
+            + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
+      }
 
-    HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
-    HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
-          + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
-    HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, 
WriteOperationType.LOG_COMPACT);
-    HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
-    if (shouldComplete) {
-      commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));
+      HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
+      InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
+      HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
+      if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
+        log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
+        table.rollbackInflightLogCompaction(inflightInstant, commitToRollback 
-> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
+        table.getMetaClient().reloadActiveTimeline();
+        throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
+            + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
+      }
+      logCompactionTimer = metrics.getLogCompactionCtx();
+      WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
+      HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
+      HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, 
WriteOperationType.LOG_COMPACT);
+      HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
+      if (shouldComplete) {
+        commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));

Review Comment:
   Good catch — done. Wrapped both `commitCompaction` and `commitLogCompaction` 
so they close the table only when self-created:
   
   ```java
   HoodieTable table = tableOpt.orElseGet(() -> createTable(config, 
context.getStorageConf()));
   try {
     completeCompaction(...);
   } finally {
     // Close only a table we created here; a caller-provided table is owned by 
the caller.
     if (!tableOpt.isPresent()) {
       table.close();
     }
   }
   ```
   
   This fixes the per-cycle FileSystemView / BitCaskDiskMap leak for the 
`Option.empty()` callers (StreamSync every sync cycle, HoodieCompactor, 
HoodieSparkCompactor, UpgradeDowngradeUtils) while leaving a caller-provided 
table owned by the caller. Pushed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to