yihua commented on code in PR #19042:
URL: https://github.com/apache/hudi/pull/19042#discussion_r3445504957


##########
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieTableServiceClient.java:
##########
@@ -219,41 +220,41 @@ private void inlineCompaction(HoodieTable table, 
Option<Map<String, String>> ext
    * @return Collection of Write Status
    */
   protected HoodieWriteMetadata<O> logCompact(String logCompactionInstantTime, 
boolean shouldComplete) {
-    HoodieTable<?, I, ?, T> table = createTable(config, 
context.getStorageConf());
-
-    // Check if a commit or compaction instant with a greater timestamp is on 
the timeline.
-    // If an instant is found then abort log compaction, since it is no longer 
needed.
-    Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION);
-    Option<HoodieInstant> compactionInstantWithGreaterTimestamp =
-        Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream()
-            .filter(hoodieInstant -> 
actions.contains(hoodieInstant.getAction()))
-            .filter(hoodieInstant -> 
compareTimestamps(hoodieInstant.requestedTime(),
-                GREATER_THAN, logCompactionInstantTime))
-            .findFirst());
-    if (compactionInstantWithGreaterTimestamp.isPresent()) {
-      throw new HoodieLogCompactException(String.format("Cannot log compact 
since a compaction instant with greater "
-          + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
-    }
+    try (HoodieTable<?, I, ?, T> table = createTable(config, 
context.getStorageConf())) {
+      // Check if a commit or compaction instant with a greater timestamp is 
on the timeline.
+      // If an instant is found then abort log compaction, since it is no 
longer needed.
+      Set<String> actions = CollectionUtils.createSet(COMMIT_ACTION, 
COMPACTION_ACTION);
+      Option<HoodieInstant> compactionInstantWithGreaterTimestamp =
+          
Option.fromJavaOptional(table.getActiveTimeline().getInstantsAsStream()
+              .filter(hoodieInstant -> 
actions.contains(hoodieInstant.getAction()))
+              .filter(hoodieInstant -> 
compareTimestamps(hoodieInstant.requestedTime(),
+                  GREATER_THAN, logCompactionInstantTime))
+              .findFirst());
+      if (compactionInstantWithGreaterTimestamp.isPresent()) {
+        throw new HoodieLogCompactException(String.format("Cannot log compact 
since a compaction instant with greater "
+            + "timestamp exists. Instant details %s", 
compactionInstantWithGreaterTimestamp.get()));
+      }
 
-    HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
-    InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
-    HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
-    if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
-      log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
-      table.rollbackInflightLogCompaction(inflightInstant, commitToRollback -> 
getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
-      table.getMetaClient().reloadActiveTimeline();
-      throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
-          + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
-    }
-    logCompactionTimer = metrics.getLogCompactionCtx();
-    WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
-    HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
-    HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, 
WriteOperationType.LOG_COMPACT);
-    HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
-    if (shouldComplete) {
-      commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));
+      HoodieTimeline pendingLogCompactionTimeline = 
table.getActiveTimeline().filterPendingLogCompactionTimeline();
+      InstantGenerator instantGenerator = 
table.getMetaClient().getInstantGenerator();
+      HoodieInstant inflightInstant = 
instantGenerator.getLogCompactionInflightInstant(logCompactionInstantTime);
+      if (pendingLogCompactionTimeline.containsInstant(inflightInstant)) {
+        log.info("Found Log compaction inflight file. Rolling back the commit 
and exiting.");
+        table.rollbackInflightLogCompaction(inflightInstant, commitToRollback 
-> getPendingRollbackInfo(table.getMetaClient(), commitToRollback, false), 
txnManager);
+        table.getMetaClient().reloadActiveTimeline();
+        throw new HoodieException("Execution is aborted since it found an 
Inflight logcompaction,"
+            + "log compaction plans are mutable plans, so reschedule another 
logcompaction.");
+      }
+      logCompactionTimer = metrics.getLogCompactionCtx();
+      WriteMarkersFactory.get(config.getMarkersType(), table, 
logCompactionInstantTime);
+      HoodieWriteMetadata<T> writeMetadata = table.logCompact(context, 
logCompactionInstantTime);
+      HoodieWriteMetadata<T> updatedWriteMetadata = 
partialUpdateTableMetadata(table, writeMetadata, logCompactionInstantTime, 
WriteOperationType.LOG_COMPACT);
+      HoodieWriteMetadata<O> logCompactionMetadata = 
convertToOutputMetadata(updatedWriteMetadata);
+      if (shouldComplete) {
+        commitLogCompaction(logCompactionInstantTime, logCompactionMetadata, 
Option.of(table));

Review Comment:
   Here `logCompact` correctly passes `Option.of(table)` and closes it via 
try-with-resources. But `commitCompaction`/`commitLogCompaction` themselves 
(L384 / L448) do `tableOpt.orElseGet(() -> createTable(...))` and never close 
that self-created table — so the per-cycle FileSystemView / `BitCaskDiskMap` 
leak this PR targets still happens on the `Option.empty()` callers: 
`StreamSync` (every sync cycle), `HoodieCompactor`, `HoodieSparkCompactor`, 
`UpgradeDowngradeUtils`. Suggest closing only when self-created:
   
   ```java
   HoodieTable table = tableOpt.orElseGet(() -> createTable(config, 
context.getStorageConf()));
   try {
     completeCompaction(...);
   } finally {
     if (!tableOpt.isPresent()) {
       table.close();
     }
   }
   ```



##########
hudi-client/hudi-java-client/src/test/java/org/apache/hudi/table/TestHoodieTableResourceLifecycle.java:
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table;
+
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.common.engine.EngineType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.hudi.common.table.view.FileSystemViewStorageType;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.common.config.HoodieMetadataConfig;
+import org.apache.hudi.config.HoodieIndexConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.testutils.HoodieJavaClientTestHarness;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Verifies that a per-cycle {@link HoodieTable} releases the resources held 
by its lazily-built
+ * {@code FileSystemViewManager}. For a {@code SPILLABLE_DISK} view the 
file-group store spills to an
+ * on-disk {@code BitCaskDiskMap}; {@link HoodieTable#close()} must close the 
view manager so the disk
+ * map is cleaned up immediately instead of lingering until JVM exit.
+ */
+public class TestHoodieTableResourceLifecycle extends 
HoodieJavaClientTestHarness {
+
+  private HoodieWriteConfig spillableDiskViewConfig(String spillDir) {
+    // maxMemoryForView=0 forces every file-group entry to spill to a 
BitCaskDiskMap on first put;
+    // the metadata table is disabled so the only spillable store is the 
file-system view's file-group map.
+    return HoodieWriteConfig.newBuilder()
+        .withPath(basePath)
+        .withEngineType(EngineType.JAVA)
+        .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .withParallelism(1, 1)
+        .withDeleteParallelism(1)
+        .withEmbeddedTimelineServerEnabled(false)
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.INMEMORY).build())
+        
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(false).build())
+        .withFileSystemViewConfig(FileSystemViewStorageConfig.newBuilder()
+            .withStorageType(FileSystemViewStorageType.SPILLABLE_DISK)
+            .withBaseStoreDir(spillDir)
+            .withMaxMemoryForView(0L)
+            .build())
+        .build();
+  }
+
+  @Test
+  public void testUnclosedHoodieTableLeavesSpillableViewOnDisk() throws 
Exception {
+    String spillDir = basePath + "/.view_spill";
+    HoodieWriteConfig writeConfig = spillableDiskViewConfig(spillDir);
+    try (HoodieJavaWriteClient<?> client = getHoodieWriteClient(writeConfig)) {
+      insertFirstBatch(writeConfig, client, "001", "000", 10, 
HoodieJavaWriteClient::insert, false, true, 10,
+          metaClient.getInstantGenerator());
+    }
+    metaClient = HoodieTableMetaClient.reload(metaClient);
+
+    // Mimic one table-service work unit on the driver: build a HoodieTable, 
materialize its spillable
+    // file-system view (forcing a spill to a BitCaskDiskMap), then drop the 
table without closing it.
+    HoodieTable<?, ?, ?, ?> table = HoodieJavaTable.create(writeConfig, 
context, metaClient);
+    table.getHoodieView().loadAllPartitions();
+    assertTrue(countDiskMapDirs(spillDir) > 0,
+        "Spilling the file-group view should create an on-disk BitCaskDiskMap 
directory");
+
+    table = null;
+    System.gc();
+    Thread.sleep(200);
+    // An unclosed HoodieTable leaves its disk map around until JVM exit; 
nothing closed the
+    // table -> view -> map, so GC does not release the on-disk resources.
+    assertTrue(countDiskMapDirs(spillDir) > 0,

Review Comment:
   This test asserts resources are **not** freed, gated on `System.gc()` + 
`Thread.sleep(200)`. GC is not guaranteed to run on request, so this is a 
latent CI flake — and it verifies the pre-fix *leak* rather than the fix 
itself. `testCloseReleasesSpillableFileSystemView` already covers the actual 
behavior; suggest dropping this test (or at minimum the `gc`/`sleep`).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to