This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch btree_multi_thread
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 08227ab66de072175455ca51fe9f55fb43a73ac4
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 12:15:37 2026 +0800

    [core] Fix recursive thread pool deadlock in GlobalIndexEvaluator parallel 
evaluation
    
    Flatten same-kind compound predicates before parallel submission to
    prevent deadlock when nested binary-tree predicates (e.g. and(and(a,b),c))
    recursively submit tasks to the same bounded executor.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../paimon/globalindex/GlobalIndexEvaluator.java   | 17 ++++-
 .../globalindex/GlobalIndexEvaluatorTest.java      | 70 ++++++++++++++++++
 .../pypaimon/globalindex/global_index_evaluator.py | 11 +++
 .../pypaimon/tests/global_index_evaluator_test.py  | 82 ++++++++++++++++++++++
 4 files changed, 179 insertions(+), 1 deletion(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index c60e5d6e08..64c4c9f12f 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -153,7 +153,7 @@ public class GlobalIndexEvaluator
     }
 
     private Optional<GlobalIndexResult> visitParallel(CompoundPredicate 
predicate) {
-        List<Predicate> children = predicate.children();
+        List<Predicate> children = flattenChildren(predicate);
         List<Future<Optional<GlobalIndexResult>>> futures = new 
ArrayList<>(children.size());
         for (Predicate child : children) {
             futures.add(executorService.submit(() -> child.visit(this)));
@@ -205,6 +205,21 @@ public class GlobalIndexEvaluator
         }
     }
 
+    private List<Predicate> flattenChildren(CompoundPredicate predicate) {
+        List<Predicate> result = new ArrayList<>();
+        for (Predicate child : predicate.children()) {
+            if (child instanceof CompoundPredicate) {
+                CompoundPredicate compound = (CompoundPredicate) child;
+                if (compound.function().getClass() == 
predicate.function().getClass()) {
+                    result.addAll(flattenChildren(compound));
+                    continue;
+                }
+            }
+            result.add(child);
+        }
+        return result;
+    }
+
     public void close() {
         IOUtils.closeAllQuietly(
                 indexReadersCache.values().stream()
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
index 2a75ae7c10..ac90fb61c6 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -262,6 +262,76 @@ class GlobalIndexEvaluatorTest {
         evaluator.close();
     }
 
+    @Test
+    void testNestedAndPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+        GlobalIndexResult resultC = resultOf(4, 5, 8, 9);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // and(a, b, c) builds as and(and(a, b), c) — nested binary tree
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2), 
builder.equal(2, 3));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // intersection of {1..5}, {3..7}, {4,5,8,9} -> {4,5}
+        assertBitmapContainsExactly(result.get().results(), 4L, 5L);
+        evaluator.close();
+    }
+
+    @Test
+    void testNestedOrPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2);
+        GlobalIndexResult resultB = resultOf(3, 4);
+        GlobalIndexResult resultC = resultOf(5, 6);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // or(a, b, c) builds as or(or(a, b), c) — nested binary tree
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.or(builder.equal(0, 1), builder.equal(1, 2), 
builder.equal(2, 3));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // union of {1,2}, {3,4}, {5,6}
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L, 
5L, 6L);
+        evaluator.close();
+    }
+
     @Test
     void testNullPredicate() {
         RowType rowType = rowType();
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 564f719cbf..8c899554ea 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -102,6 +102,15 @@ class GlobalIndexEvaluator:
 
         return compound_result
 
+    def _flatten_children(self, method: str, children) -> list:
+        result = []
+        for child in children:
+            if isinstance(child, Predicate) and child.method == method:
+                result.extend(self._flatten_children(method, child.literals))
+            else:
+                result.append(child)
+        return result
+
     def _submit_children(self, children) -> List[Future]:
         return [self._executor.submit(self._visit_predicate, child) for child 
in children]
 
@@ -109,6 +118,7 @@ class GlobalIndexEvaluator:
         return [f.result() for f in futures]
 
     def _visit_and_parallel(self, children) -> Optional[GlobalIndexResult]:
+        children = self._flatten_children('and', children)
         results = self._collect_results(self._submit_children(children))
 
         compound_result: Optional[GlobalIndexResult] = None
@@ -125,6 +135,7 @@ class GlobalIndexEvaluator:
         return compound_result
 
     def _visit_or_parallel(self, children) -> Optional[GlobalIndexResult]:
+        children = self._flatten_children('or', children)
         results = self._collect_results(self._submit_children(children))
 
         compound_result = GlobalIndexResult.create_empty()
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py 
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
index 80da2a4fee..b62a4fbe02 100644
--- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -218,6 +218,88 @@ class GlobalIndexEvaluatorTest(unittest.TestCase):
         self.assertEqual(call_count[0], 2)
         evaluator.close()
 
+    def test_nested_and_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(3, 7))
+        result_c = GlobalIndexResult.from_range(Range(4, 5))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # Nested binary tree: and(and(a, b), c)
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='and', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(method='equal', index=2, field='c', literals=[3]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # intersection of [1,5], [3,7], [4,5] -> [4,5]
+        self.assertEqual(bm.cardinality(), 2)
+        for v in [4, 5]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_nested_or_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 2))
+        result_b = GlobalIndexResult.from_range(Range(3, 4))
+        result_c = GlobalIndexResult.from_range(Range(5, 6))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # Nested binary tree: or(or(a, b), c)
+        predicate = Predicate(
+            method='or', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(method='equal', index=2, field='c', literals=[3]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # union of [1,2], [3,4], [5,6] -> {1,2,3,4,5,6}
+        self.assertEqual(bm.cardinality(), 6)
+        for v in [1, 2, 3, 4, 5, 6]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
     def test_null_predicate(self):
         fields = _make_fields()
         evaluator = GlobalIndexEvaluator(fields, lambda field: [])

Reply via email to