This is an automated email from the ASF dual-hosted git repository. JingsongLi pushed a commit to branch btree_multi_thread in repository https://gitbox.apache.org/repos/asf/paimon.git
commit 08227ab66de072175455ca51fe9f55fb43a73ac4 Author: JingsongLi <[email protected]> AuthorDate: Mon May 25 12:15:37 2026 +0800 [core] Fix recursive thread pool deadlock in GlobalIndexEvaluator parallel evaluation Flatten same-kind compound predicates before parallel submission to prevent deadlock when nested binary-tree predicates (e.g. and(and(a,b),c)) recursively submit tasks to the same bounded executor. Co-Authored-By: Claude Opus 4.6 <[email protected]> --- .../paimon/globalindex/GlobalIndexEvaluator.java | 17 ++++- .../globalindex/GlobalIndexEvaluatorTest.java | 70 ++++++++++++++++++ .../pypaimon/globalindex/global_index_evaluator.py | 11 +++ .../pypaimon/tests/global_index_evaluator_test.py | 82 ++++++++++++++++++++++ 4 files changed, 179 insertions(+), 1 deletion(-) diff --git a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java index c60e5d6e08..64c4c9f12f 100644 --- a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java +++ b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java @@ -153,7 +153,7 @@ public class GlobalIndexEvaluator } private Optional<GlobalIndexResult> visitParallel(CompoundPredicate predicate) { - List<Predicate> children = predicate.children(); + List<Predicate> children = flattenChildren(predicate); List<Future<Optional<GlobalIndexResult>>> futures = new ArrayList<>(children.size()); for (Predicate child : children) { futures.add(executorService.submit(() -> child.visit(this))); @@ -205,6 +205,21 @@ public class GlobalIndexEvaluator } } + private List<Predicate> flattenChildren(CompoundPredicate predicate) { + List<Predicate> result = new ArrayList<>(); + for (Predicate child : predicate.children()) { + if (child instanceof CompoundPredicate) { + CompoundPredicate compound = (CompoundPredicate) child; + if (compound.function().getClass() == predicate.function().getClass()) { + result.addAll(flattenChildren(compound)); + continue; + } + } + result.add(child); + } + return result; + } + public void close() { IOUtils.closeAllQuietly( indexReadersCache.values().stream() diff --git a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java index 2a75ae7c10..ac90fb61c6 100644 --- a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java +++ b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java @@ -262,6 +262,76 @@ class GlobalIndexEvaluatorTest { evaluator.close(); } + @Test + void testNestedAndPredicateDoesNotDeadlockWithSmallPool() { + executor = Executors.newFixedThreadPool(2); + RowType rowType = rowType(); + + GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5); + GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7); + GlobalIndexResult resultC = resultOf(4, 5, 8, 9); + + ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new ConcurrentHashMap<>(); + fieldResults.put(0, resultA); + fieldResults.put(1, resultB); + fieldResults.put(2, resultC); + + GlobalIndexEvaluator evaluator = + new GlobalIndexEvaluator( + rowType, + fieldId -> + Collections.singletonList( + readerReturning(fieldResults.get(fieldId))), + executor); + + // and(a, b, c) builds as and(and(a, b), c) — nested binary tree + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate predicate = + PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2), builder.equal(2, 3)); + + Optional<GlobalIndexResult> result = evaluator.evaluate(predicate); + + assertThat(result).isPresent(); + // intersection of {1..5}, {3..7}, {4,5,8,9} -> {4,5} + assertBitmapContainsExactly(result.get().results(), 4L, 5L); + evaluator.close(); + } + + @Test + void testNestedOrPredicateDoesNotDeadlockWithSmallPool() { + executor = Executors.newFixedThreadPool(2); + RowType rowType = rowType(); + + GlobalIndexResult resultA = resultOf(1, 2); + GlobalIndexResult resultB = resultOf(3, 4); + GlobalIndexResult resultC = resultOf(5, 6); + + ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new ConcurrentHashMap<>(); + fieldResults.put(0, resultA); + fieldResults.put(1, resultB); + fieldResults.put(2, resultC); + + GlobalIndexEvaluator evaluator = + new GlobalIndexEvaluator( + rowType, + fieldId -> + Collections.singletonList( + readerReturning(fieldResults.get(fieldId))), + executor); + + // or(a, b, c) builds as or(or(a, b), c) — nested binary tree + PredicateBuilder builder = new PredicateBuilder(rowType); + Predicate predicate = + PredicateBuilder.or(builder.equal(0, 1), builder.equal(1, 2), builder.equal(2, 3)); + + Optional<GlobalIndexResult> result = evaluator.evaluate(predicate); + + assertThat(result).isPresent(); + // union of {1,2}, {3,4}, {5,6} + assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L, 5L, 6L); + evaluator.close(); + } + @Test void testNullPredicate() { RowType rowType = rowType(); diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py b/paimon-python/pypaimon/globalindex/global_index_evaluator.py index 564f719cbf..8c899554ea 100644 --- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py +++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py @@ -102,6 +102,15 @@ class GlobalIndexEvaluator: return compound_result + def _flatten_children(self, method: str, children) -> list: + result = [] + for child in children: + if isinstance(child, Predicate) and child.method == method: + result.extend(self._flatten_children(method, child.literals)) + else: + result.append(child) + return result + def _submit_children(self, children) -> List[Future]: return [self._executor.submit(self._visit_predicate, child) for child in children] @@ -109,6 +118,7 @@ class GlobalIndexEvaluator: return [f.result() for f in futures] def _visit_and_parallel(self, children) -> Optional[GlobalIndexResult]: + children = self._flatten_children('and', children) results = self._collect_results(self._submit_children(children)) compound_result: Optional[GlobalIndexResult] = None @@ -125,6 +135,7 @@ class GlobalIndexEvaluator: return compound_result def _visit_or_parallel(self, children) -> Optional[GlobalIndexResult]: + children = self._flatten_children('or', children) results = self._collect_results(self._submit_children(children)) compound_result = GlobalIndexResult.create_empty() diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py b/paimon-python/pypaimon/tests/global_index_evaluator_test.py index 80da2a4fee..b62a4fbe02 100644 --- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py +++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py @@ -218,6 +218,88 @@ class GlobalIndexEvaluatorTest(unittest.TestCase): self.assertEqual(call_count[0], 2) evaluator.close() + def test_nested_and_does_not_deadlock_with_small_pool(self): + fields = _make_fields() + result_a = GlobalIndexResult.from_range(Range(1, 5)) + result_b = GlobalIndexResult.from_range(Range(3, 7)) + result_c = GlobalIndexResult.from_range(Range(4, 5)) + + field_results = {0: result_a, 1: result_b, 2: result_c} + + executor = ThreadPoolExecutor(max_workers=2) + evaluator = GlobalIndexEvaluator( + fields, + lambda field: [StubGlobalIndexReader(field_results[field.id])], + executor, + ) + + # Nested binary tree: and(and(a, b), c) + predicate = Predicate( + method='and', index=None, field=None, + literals=[ + Predicate( + method='and', index=None, field=None, + literals=[ + Predicate(method='equal', index=0, field='a', literals=[1]), + Predicate(method='equal', index=1, field='b', literals=[2]), + ], + ), + Predicate(method='equal', index=2, field='c', literals=[3]), + ], + ) + + result = evaluator.evaluate(predicate) + + self.assertIsNotNone(result) + bm = result.results() + # intersection of [1,5], [3,7], [4,5] -> [4,5] + self.assertEqual(bm.cardinality(), 2) + for v in [4, 5]: + self.assertTrue(bm.contains(v)) + evaluator.close() + executor.shutdown(wait=False) + + def test_nested_or_does_not_deadlock_with_small_pool(self): + fields = _make_fields() + result_a = GlobalIndexResult.from_range(Range(1, 2)) + result_b = GlobalIndexResult.from_range(Range(3, 4)) + result_c = GlobalIndexResult.from_range(Range(5, 6)) + + field_results = {0: result_a, 1: result_b, 2: result_c} + + executor = ThreadPoolExecutor(max_workers=2) + evaluator = GlobalIndexEvaluator( + fields, + lambda field: [StubGlobalIndexReader(field_results[field.id])], + executor, + ) + + # Nested binary tree: or(or(a, b), c) + predicate = Predicate( + method='or', index=None, field=None, + literals=[ + Predicate( + method='or', index=None, field=None, + literals=[ + Predicate(method='equal', index=0, field='a', literals=[1]), + Predicate(method='equal', index=1, field='b', literals=[2]), + ], + ), + Predicate(method='equal', index=2, field='c', literals=[3]), + ], + ) + + result = evaluator.evaluate(predicate) + + self.assertIsNotNone(result) + bm = result.results() + # union of [1,2], [3,4], [5,6] -> {1,2,3,4,5,6} + self.assertEqual(bm.cardinality(), 6) + for v in [1, 2, 3, 4, 5, 6]: + self.assertTrue(bm.contains(v)) + evaluator.close() + executor.shutdown(wait=False) + def test_null_predicate(self): fields = _make_fields() evaluator = GlobalIndexEvaluator(fields, lambda field: [])
