This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.1.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 33a17babb75e3cccb73c1e52c28faa64cc836650
Author: Ratul Dawar <[email protected]>
AuthorDate: Fri Nov 7 22:52:24 2025 +0530

    fix: Move hudi split loaders to resumable tasks architecture to prevent 
deadlocks (#14225)
    
    * Move hudi split loaders to resumable tasks architecture to prevent 
deadlocks.
    
    * Address review comments: Add javadoc and debug logging
    
    * Add testcases to verify the fix
---
 .../hudi/partition/HudiPartitionInfoLoader.java    |  79 ++++++-
 .../hudi/split/HudiBackgroundSplitLoader.java      |  29 +--
 .../partition/TestHudiPartitionInfoLoader.java     | 261 +++++++++++++++++++++
 3 files changed, 339 insertions(+), 30 deletions(-)

diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java
index 0146cd4baea3..6cd21f13e2ed 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java
@@ -13,36 +13,60 @@
  */
 package io.trino.plugin.hudi.partition;
 
-import io.airlift.concurrent.MoreFutures;
+import com.google.common.util.concurrent.ListenableFuture;
+import io.airlift.log.Logger;
 import io.trino.plugin.hive.HivePartitionKey;
 import io.trino.plugin.hive.util.AsyncQueue;
+import io.trino.plugin.hive.util.ResumableTask;
+import io.trino.plugin.hive.util.ResumableTask.TaskStatus;
 import io.trino.plugin.hudi.query.HudiDirectoryLister;
 import io.trino.plugin.hudi.split.HudiSplitFactory;
 import io.trino.spi.connector.ConnectorSplit;
 import org.apache.hudi.common.model.FileSlice;
 
 import java.util.Deque;
+import java.util.Iterator;
 import java.util.List;
 
+import static com.google.common.util.concurrent.Futures.immediateVoidFuture;
+
 public class HudiPartitionInfoLoader
-        implements Runnable
+        implements ResumableTask
 {
+    private static final Logger log = 
Logger.get(HudiPartitionInfoLoader.class);
+
     private final HudiDirectoryLister hudiDirectoryLister;
     private final HudiSplitFactory hudiSplitFactory;
     private final AsyncQueue<ConnectorSplit> asyncQueue;
     private final Deque<HiveHudiPartitionInfo> partitionQueue;
     private final String commitTime;
     private final boolean useIndex;
+    private final Deque<Iterator<ConnectorSplit>> splitIterators;
 
     private boolean isRunning;
 
+    /**
+     * Creates a new split loader.
+     *
+     * @param hudiDirectoryLister Service for listing files in a partition.
+     * @param commitTime The latest Hudi commit time for snapshot isolation.
+     * @param hudiSplitFactory Factory to generate {@link ConnectorSplit}s.
+     * @param asyncQueue The output queue to send generated splits to.
+     * @param partitionQueue The input queue of partitions to process.
+     * @param useIndex Whether to use the metadata index for file listing.
+     * @param splitIterators A deque, private to this worker, used to store
+     * partially processed split iterators. This allows the task to save
+     * its state when yielding (e.g., when the asyncQueue is full) and
+     * resume processing from the same point.
+     */
     public HudiPartitionInfoLoader(
             HudiDirectoryLister hudiDirectoryLister,
             String commitTime,
             HudiSplitFactory hudiSplitFactory,
             AsyncQueue<ConnectorSplit> asyncQueue,
             Deque<HiveHudiPartitionInfo> partitionQueue,
-            boolean useIndex)
+            boolean useIndex,
+            Deque<Iterator<ConnectorSplit>> splitIterators)
     {
         this.hudiDirectoryLister = hudiDirectoryLister;
         this.commitTime = commitTime;
@@ -51,28 +75,59 @@ public class HudiPartitionInfoLoader
         this.partitionQueue = partitionQueue;
         this.isRunning = true;
         this.useIndex = useIndex;
+        this.splitIterators = splitIterators;
     }
 
     @Override
-    public void run()
+    public TaskStatus process()
+    {
+        while (isRunning || (!partitionQueue.isEmpty() || 
!splitIterators.isEmpty())) {
+            try {
+                ListenableFuture<Void> future = loadSplits();
+                if (!future.isDone()) {
+                    return TaskStatus.continueOn(future);
+                }
+            }
+            catch (Exception e) {
+                throw new RuntimeException("Error loading splits", e);
+            }
+        }
+
+        return TaskStatus.finished();
+    }
+
+    private ListenableFuture<Void> loadSplits()
     {
-        while (isRunning || !partitionQueue.isEmpty()) {
-            HiveHudiPartitionInfo hudiPartitionInfo = partitionQueue.poll();
+        Iterator<ConnectorSplit> splits = splitIterators.poll();
+        if (splits == null) {
+            HiveHudiPartitionInfo partition = partitionQueue.poll();
+            if (partition == null) {
+                return immediateVoidFuture();
+            }
+            splits = generateSplitsFromPartition(partition);
+        }
 
-            if (hudiPartitionInfo != null && 
hudiPartitionInfo.getHivePartitionName() != null) {
-                generateSplitsFromPartition(hudiPartitionInfo);
+        while (splits.hasNext()) {
+            ConnectorSplit split = splits.next();
+            ListenableFuture<Void> future = asyncQueue.offer(split);
+            if (!future.isDone()) {
+                log.debug("AsyncQueue is full, yielding split loader task");
+                splitIterators.addFirst(splits);
+                return future;
             }
         }
+
+        return immediateVoidFuture();
     }
 
-    private void generateSplitsFromPartition(HiveHudiPartitionInfo 
hudiPartitionInfo)
+    private Iterator<ConnectorSplit> 
generateSplitsFromPartition(HiveHudiPartitionInfo hudiPartitionInfo)
     {
         List<HivePartitionKey> partitionKeys = 
hudiPartitionInfo.getHivePartitionKeys();
         List<FileSlice> partitionFileSlices = 
hudiDirectoryLister.listStatus(hudiPartitionInfo, useIndex);
-        partitionFileSlices.stream()
+        return partitionFileSlices.stream()
                 .flatMap(slice -> hudiSplitFactory.createSplits(partitionKeys, 
slice, this.commitTime).stream())
-                .map(asyncQueue::offer)
-                .forEachOrdered(MoreFutures::getFutureValue);
+                .map(ConnectorSplit.class::cast)
+                .iterator();
     }
 
     public void stopRunning()
diff --git 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
index 5f9ef5957168..999e36f1ea0d 100644
--- 
a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
+++ 
b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java
@@ -25,13 +25,13 @@ import io.trino.metastore.Partition;
 import io.trino.metastore.StorageFormat;
 import io.trino.plugin.hive.HiveColumnHandle;
 import io.trino.plugin.hive.util.AsyncQueue;
+import io.trino.plugin.hive.util.ResumableTasks;
 import io.trino.plugin.hudi.HudiTableHandle;
 import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo;
 import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader;
 import io.trino.plugin.hudi.query.HudiDirectoryLister;
 import io.trino.plugin.hudi.query.index.HudiPartitionStatsIndexSupport;
 import io.trino.plugin.hudi.query.index.IndexSupportFactory;
-import io.trino.spi.TrinoException;
 import io.trino.spi.connector.ConnectorSession;
 import io.trino.spi.connector.ConnectorSplit;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -50,11 +50,11 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HexFormat;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.function.Consumer;
@@ -64,7 +64,6 @@ import java.util.stream.IntStream;
 
 import static com.google.common.util.concurrent.MoreExecutors.directExecutor;
 import static io.airlift.concurrent.MoreFutures.addExceptionCallback;
-import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT;
 import static 
io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism;
 import static io.trino.plugin.hudi.HudiSessionProperties.getTargetSplitSize;
 import static 
io.trino.plugin.hudi.HudiSessionProperties.isMetadataPartitionListingEnabled;
@@ -155,10 +154,11 @@ public class HudiBackgroundSplitLoader
         Executor splitGeneratorExecutor = new BoundedExecutor(executor, 
splitGeneratorParallelism);
 
         for (int i = 0; i < splitGeneratorParallelism; i++) {
+            Deque<Iterator<ConnectorSplit>> splitIterators = new 
ConcurrentLinkedDeque<>();
             HudiPartitionInfoLoader generator = new 
HudiPartitionInfoLoader(hudiDirectoryLister, tableHandle.getLatestCommitTime(), 
hudiSplitFactory,
-                    asyncQueue, partitionQueue, useIndex);
+                    asyncQueue, partitionQueue, useIndex, splitIterators);
             splitGenerators.add(generator);
-            ListenableFuture<Void> future = Futures.submit(generator, 
splitGeneratorExecutor);
+            ListenableFuture<Void> future = 
ResumableTasks.submit(splitGeneratorExecutor, generator);
             addExceptionCallback(future, errorListener);
             futures.add(future);
         }
@@ -166,19 +166,12 @@ public class HudiBackgroundSplitLoader
         // Signal all generators to stop once partition queue is drained
         splitGenerators.forEach(HudiPartitionInfoLoader::stopRunning);
 
-        log.info("Wait for partition pruning split generation to finish on 
table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName());
-        try {
-            Futures.whenAllComplete(futures)
-                    .run(asyncQueue::finish, directExecutor())
-                    .get();
-            log.info("Partition pruning split generation finished on table 
%s.%s", tableHandle.getSchemaName(), tableHandle.getTableName());
-        }
-        catch (InterruptedException | ExecutionException e) {
-            if (e instanceof InterruptedException) {
-                Thread.currentThread().interrupt();
-            }
-            throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating 
Hudi split", e);
-        }
+        Futures.whenAllComplete(futures)
+                .run(() -> {
+                    asyncQueue.finish();
+                    log.info("Partition pruning split generation finished on 
table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName());
+                }, directExecutor());
+        log.info("Started partition pruning split generation on table %s.%s", 
tableHandle.getSchemaName(), tableHandle.getTableName());
     }
 
     private Deque<HiveHudiPartitionInfo> getPartitionInfos(boolean useIndex)
diff --git 
a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
new file mode 100644
index 000000000000..24c1b16dff5f
--- /dev/null
+++ 
b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.trino.plugin.hudi.partition;
+
+import com.google.common.collect.ImmutableList;
+import io.airlift.units.DataSize;
+import io.trino.filesystem.Location;
+import io.trino.filesystem.cache.DefaultCachingHostAddressProvider;
+import io.trino.metastore.Partition;
+import io.trino.metastore.StorageFormat;
+import io.trino.plugin.hive.HiveColumnHandle;
+import io.trino.plugin.hive.util.AsyncQueue;
+import io.trino.plugin.hive.util.ResumableTask.TaskStatus;
+import io.trino.plugin.hive.util.ThrottledAsyncQueue;
+import io.trino.plugin.hudi.HudiTableHandle;
+import io.trino.plugin.hudi.query.HudiDirectoryLister;
+import io.trino.plugin.hudi.split.HudiSplitFactory;
+import io.trino.plugin.hudi.split.HudiSplitWeightProvider;
+import io.trino.plugin.hudi.split.SizeBasedSplitWeightProvider;
+import io.trino.spi.connector.ConnectorSplit;
+import io.trino.spi.connector.SchemaTableName;
+import io.trino.spi.predicate.TupleDomain;
+import org.apache.hudi.common.model.FileSlice;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieFileGroupId;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.storage.StoragePath;
+import org.apache.hudi.storage.StoragePathInfo;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Executors;
+
+import static io.airlift.units.DataSize.Unit.MEGABYTE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class TestHudiPartitionInfoLoader
+{
+    private static final String COMMIT_TIME = "20250625153731546";
+    private static final String TABLE_PATH = "/test/table/path";
+
+    @Test
+    public void testLoaderCreation()
+    {
+        // Test that we can create a HudiPartitionInfoLoader with valid inputs
+        try (TestHudiDirectoryLister directoryLister = new 
TestHudiDirectoryLister()) {
+            HudiSplitFactory splitFactory = createSplitFactory();
+            AsyncQueue<ConnectorSplit> asyncQueue = new ThrottledAsyncQueue<>(
+                    100,
+                    1000,
+                    Executors.newSingleThreadExecutor());
+            Deque<HiveHudiPartitionInfo> partitionQueue = new 
ConcurrentLinkedDeque<>();
+            Deque<Iterator<ConnectorSplit>> splitIterators = new 
ConcurrentLinkedDeque<>();
+
+            HudiPartitionInfoLoader loader = new HudiPartitionInfoLoader(
+                    directoryLister,
+                    COMMIT_TIME,
+                    splitFactory,
+                    asyncQueue,
+                    partitionQueue,
+                    false,
+                    splitIterators);
+
+            assertThat(loader).isNotNull();
+        }
+    }
+
+    @Test
+    public void testLoaderYieldsAndResumesWithMultiplePartitions()
+    {
+        // This test verifies the fix for 
https://github.com/trinodb/trino/issues/26967
+        // Simulates the deadlock scenario: multiple partitions generating 
splits with a small queue
+        // The loader should:
+        // 1. Yield when queue is full (return TaskStatus.continueOn)
+        // 2. Save its state (current split iterator)
+        // 3. Resume from saved state on next process() call
+        // 4. Complete without blocking threads
+
+        try (TestHudiDirectoryListerWithSplits directoryLister = new 
TestHudiDirectoryListerWithSplits()) {
+            HudiSplitFactory splitFactory = createSplitFactory();
+
+            // Create a queue with capacity of 1 to trigger full condition 
immediately
+            AsyncQueue<ConnectorSplit> asyncQueue = new ThrottledAsyncQueue<>(
+                    1000,  // maxSplitsPerSecond
+                    1,     // maxOutstandingSplits - very small to trigger 
yielding
+                    Executors.newSingleThreadExecutor());
+
+            // Create multiple partitions to process
+            Deque<HiveHudiPartitionInfo> partitionQueue = new 
ConcurrentLinkedDeque<>();
+            partitionQueue.add(createTestPartition("partition1"));
+            partitionQueue.add(createTestPartition("partition2"));
+            partitionQueue.add(createTestPartition("partition3"));
+
+            Deque<Iterator<ConnectorSplit>> splitIterators = new 
ConcurrentLinkedDeque<>();
+
+            HudiPartitionInfoLoader loader = new HudiPartitionInfoLoader(
+                    directoryLister,
+                    COMMIT_TIME,
+                    splitFactory,
+                    asyncQueue,
+                    partitionQueue,
+                    false,
+                    splitIterators);
+
+            // First process() call - should start processing first partition
+            TaskStatus status1 = loader.process();
+
+            // With a queue capacity of 1 and multiple splits per partition,
+            // the loader should yield after adding the first split
+            // The key fix: it returns TaskStatus.continueOn(future) instead 
of blocking
+            if (!status1.isFinished()) {
+                assertThat(status1.isFinished()).isFalse();
+                // Verify that state was saved - splitIterators should have 
the current iterator
+                assertThat(splitIterators).isNotEmpty();
+            }
+
+            // Signal to stop processing new partitions
+            loader.stopRunning();
+
+            // Continue processing until finished
+            // The loader should be able to resume and complete without 
deadlock
+            int maxIterations = 100; // Safety limit to prevent infinite loop 
in test
+            int iterations = 0;
+            TaskStatus currentStatus = status1;
+
+            while (!currentStatus.isFinished() && iterations < maxIterations) {
+                // Simulate consuming from the queue to make space
+                asyncQueue.getBatchAsync(10);
+
+                // Resume processing
+                currentStatus = loader.process();
+                iterations++;
+            }
+
+            // Verify the loader completed successfully without blocking
+            assertThat(currentStatus.isFinished()).isTrue();
+            assertThat(iterations).isLessThan(maxIterations);
+        }
+    }
+
+    private static HudiSplitFactory createSplitFactory()
+    {
+        HudiTableHandle tableHandle = new HudiTableHandle(
+                "test_schema",
+                "test_table",
+                TABLE_PATH,
+                HoodieTableType.COPY_ON_WRITE,
+                ImmutableList.of(),
+                TupleDomain.all(),
+                TupleDomain.all(),
+                "",
+                "101");
+        HudiSplitWeightProvider weightProvider = new 
SizeBasedSplitWeightProvider(0.05, DataSize.of(128, MEGABYTE));
+        return new HudiSplitFactory(tableHandle, weightProvider, 
DataSize.of(128, MEGABYTE), new DefaultCachingHostAddressProvider());
+    }
+
+    private static HiveHudiPartitionInfo createTestPartition(String 
partitionPath)
+    {
+        SchemaTableName schemaTableName = new SchemaTableName("test_schema", 
"test_table");
+        Location tableLocation = Location.of(TABLE_PATH);
+        String partitionLocation = TABLE_PATH + "/" + partitionPath;
+
+        Partition partition = Partition.builder()
+                .setDatabaseName("test_schema")
+                .setTableName("test_table")
+                .setValues(ImmutableList.of())
+                .withStorage(storageBuilder -> storageBuilder
+                        
.setStorageFormat(StorageFormat.create("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
+                                "org.apache.hadoop.mapred.TextInputFormat",
+                                
"org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat"))
+                        .setLocation(partitionLocation))
+                .setColumns(ImmutableList.of())
+                .build();
+
+        return new HiveHudiPartitionInfo(
+                schemaTableName,
+                tableLocation,
+                partitionPath,
+                partition,
+                ImmutableList.<HiveColumnHandle>of(),
+                TupleDomain.all());
+    }
+
+    private static FileSlice createTestFileSlice(String partitionPath, int 
fileNumber)
+    {
+        String fileId = "test-file-" + fileNumber;
+        HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partitionPath, 
fileId);
+        long blockSize = 8L * 1024 * 1024;
+        String baseFilePath = TABLE_PATH + "/" + partitionPath + "/" + fileId 
+ "_" + COMMIT_TIME + ".parquet";
+
+        StoragePathInfo baseFileInfo = new StoragePathInfo(
+                new StoragePath(baseFilePath),
+                DataSize.of(10, MEGABYTE).toBytes(),
+                false,
+                (short) 0,
+                blockSize,
+                System.currentTimeMillis());
+
+        HoodieBaseFile baseFile = new HoodieBaseFile(baseFileInfo);
+        return new FileSlice(fileGroupId, COMMIT_TIME, baseFile, 
ImmutableList.of());
+    }
+
+    // Test implementation of HudiDirectoryLister that returns empty file 
slices
+    private static class TestHudiDirectoryLister
+            implements HudiDirectoryLister
+    {
+        @Override
+        public List<FileSlice> listStatus(HudiPartitionInfo partition, boolean 
useIndex)
+        {
+            // Return empty list for testing
+            return ImmutableList.of();
+        }
+
+        @Override
+        public void close()
+        {
+            // No-op for testing
+        }
+    }
+
+    // Test implementation of HudiDirectoryLister that returns file slices 
with splits
+    private static class TestHudiDirectoryListerWithSplits
+            implements HudiDirectoryLister
+    {
+        @Override
+        public List<FileSlice> listStatus(HudiPartitionInfo partition, boolean 
useIndex)
+        {
+            // Return multiple file slices per partition to trigger queue full 
condition
+            String partitionPath = partition.getRelativePartitionPath();
+            List<FileSlice> fileSlices = new ArrayList<>();
+
+            // Create 3 file slices per partition
+            for (int i = 0; i < 3; i++) {
+                fileSlices.add(createTestFileSlice(partitionPath, i));
+            }
+
+            return fileSlices;
+        }
+
+        @Override
+        public void close()
+        {
+            // No-op for testing
+        }
+    }
+}

Reply via email to