This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.1.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 33a17babb75e3cccb73c1e52c28faa64cc836650 Author: Ratul Dawar <[email protected]> AuthorDate: Fri Nov 7 22:52:24 2025 +0530 fix: Move hudi split loaders to resumable tasks architecture to prevent deadlocks (#14225) * Move hudi split loaders to resumable tasks architecture to prevent deadlocks. * Address review comments: Add javadoc and debug logging * Add testcases to verify the fix --- .../hudi/partition/HudiPartitionInfoLoader.java | 79 ++++++- .../hudi/split/HudiBackgroundSplitLoader.java | 29 +-- .../partition/TestHudiPartitionInfoLoader.java | 261 +++++++++++++++++++++ 3 files changed, 339 insertions(+), 30 deletions(-) diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java index 0146cd4baea3..6cd21f13e2ed 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/partition/HudiPartitionInfoLoader.java @@ -13,36 +13,60 @@ */ package io.trino.plugin.hudi.partition; -import io.airlift.concurrent.MoreFutures; +import com.google.common.util.concurrent.ListenableFuture; +import io.airlift.log.Logger; import io.trino.plugin.hive.HivePartitionKey; import io.trino.plugin.hive.util.AsyncQueue; +import io.trino.plugin.hive.util.ResumableTask; +import io.trino.plugin.hive.util.ResumableTask.TaskStatus; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.split.HudiSplitFactory; import io.trino.spi.connector.ConnectorSplit; import org.apache.hudi.common.model.FileSlice; import java.util.Deque; +import java.util.Iterator; import java.util.List; +import static com.google.common.util.concurrent.Futures.immediateVoidFuture; + public class HudiPartitionInfoLoader - implements Runnable + implements ResumableTask { + private static final Logger log = Logger.get(HudiPartitionInfoLoader.class); + private final HudiDirectoryLister hudiDirectoryLister; private final HudiSplitFactory hudiSplitFactory; private final AsyncQueue<ConnectorSplit> asyncQueue; private final Deque<HiveHudiPartitionInfo> partitionQueue; private final String commitTime; private final boolean useIndex; + private final Deque<Iterator<ConnectorSplit>> splitIterators; private boolean isRunning; + /** + * Creates a new split loader. + * + * @param hudiDirectoryLister Service for listing files in a partition. + * @param commitTime The latest Hudi commit time for snapshot isolation. + * @param hudiSplitFactory Factory to generate {@link ConnectorSplit}s. + * @param asyncQueue The output queue to send generated splits to. + * @param partitionQueue The input queue of partitions to process. + * @param useIndex Whether to use the metadata index for file listing. + * @param splitIterators A deque, private to this worker, used to store + * partially processed split iterators. This allows the task to save + * its state when yielding (e.g., when the asyncQueue is full) and + * resume processing from the same point. + */ public HudiPartitionInfoLoader( HudiDirectoryLister hudiDirectoryLister, String commitTime, HudiSplitFactory hudiSplitFactory, AsyncQueue<ConnectorSplit> asyncQueue, Deque<HiveHudiPartitionInfo> partitionQueue, - boolean useIndex) + boolean useIndex, + Deque<Iterator<ConnectorSplit>> splitIterators) { this.hudiDirectoryLister = hudiDirectoryLister; this.commitTime = commitTime; @@ -51,28 +75,59 @@ public class HudiPartitionInfoLoader this.partitionQueue = partitionQueue; this.isRunning = true; this.useIndex = useIndex; + this.splitIterators = splitIterators; } @Override - public void run() + public TaskStatus process() + { + while (isRunning || (!partitionQueue.isEmpty() || !splitIterators.isEmpty())) { + try { + ListenableFuture<Void> future = loadSplits(); + if (!future.isDone()) { + return TaskStatus.continueOn(future); + } + } + catch (Exception e) { + throw new RuntimeException("Error loading splits", e); + } + } + + return TaskStatus.finished(); + } + + private ListenableFuture<Void> loadSplits() { - while (isRunning || !partitionQueue.isEmpty()) { - HiveHudiPartitionInfo hudiPartitionInfo = partitionQueue.poll(); + Iterator<ConnectorSplit> splits = splitIterators.poll(); + if (splits == null) { + HiveHudiPartitionInfo partition = partitionQueue.poll(); + if (partition == null) { + return immediateVoidFuture(); + } + splits = generateSplitsFromPartition(partition); + } - if (hudiPartitionInfo != null && hudiPartitionInfo.getHivePartitionName() != null) { - generateSplitsFromPartition(hudiPartitionInfo); + while (splits.hasNext()) { + ConnectorSplit split = splits.next(); + ListenableFuture<Void> future = asyncQueue.offer(split); + if (!future.isDone()) { + log.debug("AsyncQueue is full, yielding split loader task"); + splitIterators.addFirst(splits); + return future; } } + + return immediateVoidFuture(); } - private void generateSplitsFromPartition(HiveHudiPartitionInfo hudiPartitionInfo) + private Iterator<ConnectorSplit> generateSplitsFromPartition(HiveHudiPartitionInfo hudiPartitionInfo) { List<HivePartitionKey> partitionKeys = hudiPartitionInfo.getHivePartitionKeys(); List<FileSlice> partitionFileSlices = hudiDirectoryLister.listStatus(hudiPartitionInfo, useIndex); - partitionFileSlices.stream() + return partitionFileSlices.stream() .flatMap(slice -> hudiSplitFactory.createSplits(partitionKeys, slice, this.commitTime).stream()) - .map(asyncQueue::offer) - .forEachOrdered(MoreFutures::getFutureValue); + .map(ConnectorSplit.class::cast) + .iterator(); } public void stopRunning() diff --git a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java index 5f9ef5957168..999e36f1ea0d 100644 --- a/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java +++ b/hudi-trino-plugin/src/main/java/io/trino/plugin/hudi/split/HudiBackgroundSplitLoader.java @@ -25,13 +25,13 @@ import io.trino.metastore.Partition; import io.trino.metastore.StorageFormat; import io.trino.plugin.hive.HiveColumnHandle; import io.trino.plugin.hive.util.AsyncQueue; +import io.trino.plugin.hive.util.ResumableTasks; import io.trino.plugin.hudi.HudiTableHandle; import io.trino.plugin.hudi.partition.HiveHudiPartitionInfo; import io.trino.plugin.hudi.partition.HudiPartitionInfoLoader; import io.trino.plugin.hudi.query.HudiDirectoryLister; import io.trino.plugin.hudi.query.index.HudiPartitionStatsIndexSupport; import io.trino.plugin.hudi.query.index.IndexSupportFactory; -import io.trino.spi.TrinoException; import io.trino.spi.connector.ConnectorSession; import io.trino.spi.connector.ConnectorSplit; import org.apache.hudi.common.table.HoodieTableConfig; @@ -50,11 +50,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Deque; import java.util.HexFormat; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.function.Consumer; @@ -64,7 +64,6 @@ import java.util.stream.IntStream; import static com.google.common.util.concurrent.MoreExecutors.directExecutor; import static io.airlift.concurrent.MoreFutures.addExceptionCallback; -import static io.trino.plugin.hudi.HudiErrorCode.HUDI_CANNOT_OPEN_SPLIT; import static io.trino.plugin.hudi.HudiSessionProperties.getSplitGeneratorParallelism; import static io.trino.plugin.hudi.HudiSessionProperties.getTargetSplitSize; import static io.trino.plugin.hudi.HudiSessionProperties.isMetadataPartitionListingEnabled; @@ -155,10 +154,11 @@ public class HudiBackgroundSplitLoader Executor splitGeneratorExecutor = new BoundedExecutor(executor, splitGeneratorParallelism); for (int i = 0; i < splitGeneratorParallelism; i++) { + Deque<Iterator<ConnectorSplit>> splitIterators = new ConcurrentLinkedDeque<>(); HudiPartitionInfoLoader generator = new HudiPartitionInfoLoader(hudiDirectoryLister, tableHandle.getLatestCommitTime(), hudiSplitFactory, - asyncQueue, partitionQueue, useIndex); + asyncQueue, partitionQueue, useIndex, splitIterators); splitGenerators.add(generator); - ListenableFuture<Void> future = Futures.submit(generator, splitGeneratorExecutor); + ListenableFuture<Void> future = ResumableTasks.submit(splitGeneratorExecutor, generator); addExceptionCallback(future, errorListener); futures.add(future); } @@ -166,19 +166,12 @@ public class HudiBackgroundSplitLoader // Signal all generators to stop once partition queue is drained splitGenerators.forEach(HudiPartitionInfoLoader::stopRunning); - log.info("Wait for partition pruning split generation to finish on table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName()); - try { - Futures.whenAllComplete(futures) - .run(asyncQueue::finish, directExecutor()) - .get(); - log.info("Partition pruning split generation finished on table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName()); - } - catch (InterruptedException | ExecutionException e) { - if (e instanceof InterruptedException) { - Thread.currentThread().interrupt(); - } - throw new TrinoException(HUDI_CANNOT_OPEN_SPLIT, "Error generating Hudi split", e); - } + Futures.whenAllComplete(futures) + .run(() -> { + asyncQueue.finish(); + log.info("Partition pruning split generation finished on table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName()); + }, directExecutor()); + log.info("Started partition pruning split generation on table %s.%s", tableHandle.getSchemaName(), tableHandle.getTableName()); } private Deque<HiveHudiPartitionInfo> getPartitionInfos(boolean useIndex) diff --git a/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java new file mode 100644 index 000000000000..24c1b16dff5f --- /dev/null +++ b/hudi-trino-plugin/src/test/java/io/trino/plugin/hudi/partition/TestHudiPartitionInfoLoader.java @@ -0,0 +1,261 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.hudi.partition; + +import com.google.common.collect.ImmutableList; +import io.airlift.units.DataSize; +import io.trino.filesystem.Location; +import io.trino.filesystem.cache.DefaultCachingHostAddressProvider; +import io.trino.metastore.Partition; +import io.trino.metastore.StorageFormat; +import io.trino.plugin.hive.HiveColumnHandle; +import io.trino.plugin.hive.util.AsyncQueue; +import io.trino.plugin.hive.util.ResumableTask.TaskStatus; +import io.trino.plugin.hive.util.ThrottledAsyncQueue; +import io.trino.plugin.hudi.HudiTableHandle; +import io.trino.plugin.hudi.query.HudiDirectoryLister; +import io.trino.plugin.hudi.split.HudiSplitFactory; +import io.trino.plugin.hudi.split.HudiSplitWeightProvider; +import io.trino.plugin.hudi.split.SizeBasedSplitWeightProvider; +import io.trino.spi.connector.ConnectorSplit; +import io.trino.spi.connector.SchemaTableName; +import io.trino.spi.predicate.TupleDomain; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.model.HoodieTableType; +import org.apache.hudi.storage.StoragePath; +import org.apache.hudi.storage.StoragePathInfo; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Executors; + +import static io.airlift.units.DataSize.Unit.MEGABYTE; +import static org.assertj.core.api.Assertions.assertThat; + +public class TestHudiPartitionInfoLoader +{ + private static final String COMMIT_TIME = "20250625153731546"; + private static final String TABLE_PATH = "/test/table/path"; + + @Test + public void testLoaderCreation() + { + // Test that we can create a HudiPartitionInfoLoader with valid inputs + try (TestHudiDirectoryLister directoryLister = new TestHudiDirectoryLister()) { + HudiSplitFactory splitFactory = createSplitFactory(); + AsyncQueue<ConnectorSplit> asyncQueue = new ThrottledAsyncQueue<>( + 100, + 1000, + Executors.newSingleThreadExecutor()); + Deque<HiveHudiPartitionInfo> partitionQueue = new ConcurrentLinkedDeque<>(); + Deque<Iterator<ConnectorSplit>> splitIterators = new ConcurrentLinkedDeque<>(); + + HudiPartitionInfoLoader loader = new HudiPartitionInfoLoader( + directoryLister, + COMMIT_TIME, + splitFactory, + asyncQueue, + partitionQueue, + false, + splitIterators); + + assertThat(loader).isNotNull(); + } + } + + @Test + public void testLoaderYieldsAndResumesWithMultiplePartitions() + { + // This test verifies the fix for https://github.com/trinodb/trino/issues/26967 + // Simulates the deadlock scenario: multiple partitions generating splits with a small queue + // The loader should: + // 1. Yield when queue is full (return TaskStatus.continueOn) + // 2. Save its state (current split iterator) + // 3. Resume from saved state on next process() call + // 4. Complete without blocking threads + + try (TestHudiDirectoryListerWithSplits directoryLister = new TestHudiDirectoryListerWithSplits()) { + HudiSplitFactory splitFactory = createSplitFactory(); + + // Create a queue with capacity of 1 to trigger full condition immediately + AsyncQueue<ConnectorSplit> asyncQueue = new ThrottledAsyncQueue<>( + 1000, // maxSplitsPerSecond + 1, // maxOutstandingSplits - very small to trigger yielding + Executors.newSingleThreadExecutor()); + + // Create multiple partitions to process + Deque<HiveHudiPartitionInfo> partitionQueue = new ConcurrentLinkedDeque<>(); + partitionQueue.add(createTestPartition("partition1")); + partitionQueue.add(createTestPartition("partition2")); + partitionQueue.add(createTestPartition("partition3")); + + Deque<Iterator<ConnectorSplit>> splitIterators = new ConcurrentLinkedDeque<>(); + + HudiPartitionInfoLoader loader = new HudiPartitionInfoLoader( + directoryLister, + COMMIT_TIME, + splitFactory, + asyncQueue, + partitionQueue, + false, + splitIterators); + + // First process() call - should start processing first partition + TaskStatus status1 = loader.process(); + + // With a queue capacity of 1 and multiple splits per partition, + // the loader should yield after adding the first split + // The key fix: it returns TaskStatus.continueOn(future) instead of blocking + if (!status1.isFinished()) { + assertThat(status1.isFinished()).isFalse(); + // Verify that state was saved - splitIterators should have the current iterator + assertThat(splitIterators).isNotEmpty(); + } + + // Signal to stop processing new partitions + loader.stopRunning(); + + // Continue processing until finished + // The loader should be able to resume and complete without deadlock + int maxIterations = 100; // Safety limit to prevent infinite loop in test + int iterations = 0; + TaskStatus currentStatus = status1; + + while (!currentStatus.isFinished() && iterations < maxIterations) { + // Simulate consuming from the queue to make space + asyncQueue.getBatchAsync(10); + + // Resume processing + currentStatus = loader.process(); + iterations++; + } + + // Verify the loader completed successfully without blocking + assertThat(currentStatus.isFinished()).isTrue(); + assertThat(iterations).isLessThan(maxIterations); + } + } + + private static HudiSplitFactory createSplitFactory() + { + HudiTableHandle tableHandle = new HudiTableHandle( + "test_schema", + "test_table", + TABLE_PATH, + HoodieTableType.COPY_ON_WRITE, + ImmutableList.of(), + TupleDomain.all(), + TupleDomain.all(), + "", + "101"); + HudiSplitWeightProvider weightProvider = new SizeBasedSplitWeightProvider(0.05, DataSize.of(128, MEGABYTE)); + return new HudiSplitFactory(tableHandle, weightProvider, DataSize.of(128, MEGABYTE), new DefaultCachingHostAddressProvider()); + } + + private static HiveHudiPartitionInfo createTestPartition(String partitionPath) + { + SchemaTableName schemaTableName = new SchemaTableName("test_schema", "test_table"); + Location tableLocation = Location.of(TABLE_PATH); + String partitionLocation = TABLE_PATH + "/" + partitionPath; + + Partition partition = Partition.builder() + .setDatabaseName("test_schema") + .setTableName("test_table") + .setValues(ImmutableList.of()) + .withStorage(storageBuilder -> storageBuilder + .setStorageFormat(StorageFormat.create("org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe", + "org.apache.hadoop.mapred.TextInputFormat", + "org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat")) + .setLocation(partitionLocation)) + .setColumns(ImmutableList.of()) + .build(); + + return new HiveHudiPartitionInfo( + schemaTableName, + tableLocation, + partitionPath, + partition, + ImmutableList.<HiveColumnHandle>of(), + TupleDomain.all()); + } + + private static FileSlice createTestFileSlice(String partitionPath, int fileNumber) + { + String fileId = "test-file-" + fileNumber; + HoodieFileGroupId fileGroupId = new HoodieFileGroupId(partitionPath, fileId); + long blockSize = 8L * 1024 * 1024; + String baseFilePath = TABLE_PATH + "/" + partitionPath + "/" + fileId + "_" + COMMIT_TIME + ".parquet"; + + StoragePathInfo baseFileInfo = new StoragePathInfo( + new StoragePath(baseFilePath), + DataSize.of(10, MEGABYTE).toBytes(), + false, + (short) 0, + blockSize, + System.currentTimeMillis()); + + HoodieBaseFile baseFile = new HoodieBaseFile(baseFileInfo); + return new FileSlice(fileGroupId, COMMIT_TIME, baseFile, ImmutableList.of()); + } + + // Test implementation of HudiDirectoryLister that returns empty file slices + private static class TestHudiDirectoryLister + implements HudiDirectoryLister + { + @Override + public List<FileSlice> listStatus(HudiPartitionInfo partition, boolean useIndex) + { + // Return empty list for testing + return ImmutableList.of(); + } + + @Override + public void close() + { + // No-op for testing + } + } + + // Test implementation of HudiDirectoryLister that returns file slices with splits + private static class TestHudiDirectoryListerWithSplits + implements HudiDirectoryLister + { + @Override + public List<FileSlice> listStatus(HudiPartitionInfo partition, boolean useIndex) + { + // Return multiple file slices per partition to trigger queue full condition + String partitionPath = partition.getRelativePartitionPath(); + List<FileSlice> fileSlices = new ArrayList<>(); + + // Create 3 file slices per partition + for (int i = 0; i < 3; i++) { + fileSlices.add(createTestFileSlice(partitionPath, i)); + } + + return fileSlices; + } + + @Override + public void close() + { + // No-op for testing + } + } +}
