This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch btree_multi_thread
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/btree_multi_thread by this 
push:
     new 5b489c3002 [core] Make parallel evaluation fully non-recursive to 
prevent deep nested deadlock
5b489c3002 is described below

commit 5b489c30027d141fe9cf0148dc7a5428fa492d01
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 13:20:23 2026 +0800

    [core] Make parallel evaluation fully non-recursive to prevent deep nested 
deadlock
    
    Replace evaluateChildSequentially with evaluateWithoutParallel that
    recursively evaluates the entire subtree without ever submitting to the
    executor. This prevents deadlock for arbitrarily deep mixed compound
    predicates like AND(OR(AND(a,b),c), OR(AND(d,e),f)).
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../paimon/globalindex/GlobalIndexEvaluator.java   | 37 +++++++++++--
 .../globalindex/GlobalIndexEvaluatorTest.java      | 43 +++++++++++++++
 .../pypaimon/globalindex/global_index_evaluator.py | 32 +++++++++---
 .../pypaimon/tests/global_index_evaluator_test.py  | 61 ++++++++++++++++++++++
 4 files changed, 160 insertions(+), 13 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index ed04313649..7d86a5c58e 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -156,7 +156,7 @@ public class GlobalIndexEvaluator
         List<Predicate> children = flattenChildren(predicate);
         List<Future<Optional<GlobalIndexResult>>> futures = new 
ArrayList<>(children.size());
         for (Predicate child : children) {
-            futures.add(executorService.submit(() -> 
evaluateChildSequentially(child)));
+            futures.add(executorService.submit(() -> 
evaluateWithoutParallel(child)));
         }
 
         List<Optional<GlobalIndexResult>> results = new 
ArrayList<>(children.size());
@@ -205,11 +205,38 @@ public class GlobalIndexEvaluator
         }
     }
 
-    private Optional<GlobalIndexResult> evaluateChildSequentially(Predicate 
child) {
-        if (child instanceof CompoundPredicate) {
-            return visitSequential((CompoundPredicate) child);
+    private Optional<GlobalIndexResult> evaluateWithoutParallel(Predicate 
predicate) {
+        if (predicate instanceof LeafPredicate) {
+            return visit((LeafPredicate) predicate);
+        }
+        CompoundPredicate compound = (CompoundPredicate) predicate;
+        if (compound.function() instanceof Or) {
+            GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
+            for (Predicate child : compound.children()) {
+                Optional<GlobalIndexResult> childResult = 
evaluateWithoutParallel(child);
+                if (!childResult.isPresent()) {
+                    return Optional.empty();
+                }
+                compoundResult = compoundResult.or(childResult.get());
+            }
+            return Optional.of(compoundResult);
+        } else {
+            Optional<GlobalIndexResult> compoundResult = Optional.empty();
+            for (Predicate child : compound.children()) {
+                Optional<GlobalIndexResult> childResult = 
evaluateWithoutParallel(child);
+                if (childResult.isPresent()) {
+                    if (compoundResult.isPresent()) {
+                        compoundResult = 
Optional.of(compoundResult.get().and(childResult.get()));
+                    } else {
+                        compoundResult = childResult;
+                    }
+                }
+                if (compoundResult.isPresent() && 
compoundResult.get().results().isEmpty()) {
+                    return compoundResult;
+                }
+            }
+            return compoundResult;
         }
-        return child.visit(this);
     }
 
     private List<Predicate> flattenChildren(CompoundPredicate predicate) {
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
index e7e28e16e0..10a2577b94 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -372,6 +372,49 @@ class GlobalIndexEvaluatorTest {
         evaluator.close();
     }
 
+    @Test
+    void testDeepMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(2, 3, 4, 5, 6);
+        GlobalIndexResult resultC = resultOf(3, 4, 5, 6, 7);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(builder.equal(0, 1), 
builder.equal(1, 2)),
+                                builder.equal(2, 3)),
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(builder.equal(0, 4), 
builder.equal(2, 5)),
+                                builder.equal(1, 6)));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // OR(AND(a,b), c): AND(a,b)={2,3,4,5}, c={3..7} => union={2,3,4,5,6,7}
+        // OR(AND(a,c), b): AND(a,c)={3,4,5}, b={2..6} => union={2,3,4,5,6}
+        // top AND: intersection = {2,3,4,5,6}
+        assertBitmapContainsExactly(result.get().results(), 2L, 3L, 4L, 5L, 
6L);
+        evaluator.close();
+    }
+
     @Test
     void testNullPredicate() {
         RowType rowType = rowType();
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index d8b45e9e86..5ca980e218 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -111,16 +111,32 @@ class GlobalIndexEvaluator:
                 result.append(child)
         return result
 
-    def _evaluate_child_sequentially(self, child):
-        if isinstance(child, Predicate) and child.method in ('and', 'or'):
-            if child.method == 'and':
-                return self._visit_and_sequential(child.literals)
-            else:
-                return self._visit_or_sequential(child.literals)
-        return self._visit_predicate(child)
+    def _evaluate_without_parallel(self, predicate):
+        if not isinstance(predicate, Predicate) or predicate.method not in 
('and', 'or'):
+            return self._visit_predicate(predicate)
+        if predicate.method == 'and':
+            compound_result = None
+            for child in predicate.literals:
+                child_result = self._evaluate_without_parallel(child)
+                if child_result is not None:
+                    if compound_result is not None:
+                        compound_result = compound_result.and_(child_result)
+                    else:
+                        compound_result = child_result
+                if compound_result is not None and compound_result.is_empty():
+                    return compound_result
+            return compound_result
+        else:
+            compound_result = GlobalIndexResult.create_empty()
+            for child in predicate.literals:
+                child_result = self._evaluate_without_parallel(child)
+                if child_result is None:
+                    return None
+                compound_result = compound_result.or_(child_result)
+            return compound_result
 
     def _submit_children(self, children) -> List[Future]:
-        return [self._executor.submit(self._evaluate_child_sequentially, 
child) for child in children]
+        return [self._executor.submit(self._evaluate_without_parallel, child) 
for child in children]
 
     def _collect_results(self, futures: List[Future]) -> 
List[Optional[GlobalIndexResult]]:
         return [f.result() for f in futures]
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py 
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
index e59038849c..5dfc39f0cc 100644
--- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -349,6 +349,67 @@ class GlobalIndexEvaluatorTest(unittest.TestCase):
         evaluator.close()
         executor.shutdown(wait=False)
 
+    def test_deep_mixed_nested_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(2, 6))
+        result_c = GlobalIndexResult.from_range(Range(3, 7))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(
+                            method='and', index=None, field=None,
+                            literals=[
+                                Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                                Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                            ],
+                        ),
+                        Predicate(method='equal', index=2, field='c', 
literals=[3]),
+                    ],
+                ),
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(
+                            method='and', index=None, field=None,
+                            literals=[
+                                Predicate(method='equal', index=0, field='a', 
literals=[4]),
+                                Predicate(method='equal', index=2, field='c', 
literals=[5]),
+                            ],
+                        ),
+                        Predicate(method='equal', index=1, field='b', 
literals=[6]),
+                    ],
+                ),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # OR(AND(a,b), c): AND(a,b)=intersect([1,5],[2,6])=[2,5], c=[3,7] => 
union=[2,7]
+        # OR(AND(a,c), b): AND(a,c)=intersect([1,5],[3,7])=[3,5], b=[2,6] => 
union=[2,6]
+        # top AND: intersect([2,7],[2,6]) = [2,6]
+        self.assertEqual(bm.cardinality(), 5)
+        for v in [2, 3, 4, 5, 6]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
     def test_null_predicate(self):
         fields = _make_fields()
         evaluator = GlobalIndexEvaluator(fields, lambda field: [])

Reply via email to