This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch btree_multi_thread
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/btree_multi_thread by this
push:
new 46c8a58118 [core] Fix mixed nested compound predicate deadlock in
parallel evaluation
46c8a58118 is described below
commit 46c8a58118c1292ff0f0ad9d009169a2e64b5ff0
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 13:10:47 2026 +0800
[core] Fix mixed nested compound predicate deadlock in parallel evaluation
Compound children (e.g. OR under AND) submitted to the executor now
evaluate their own children sequentially, preventing recursive pool
submissions that cause deadlock with bounded thread pools.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 9 +++-
.../globalindex/GlobalIndexEvaluatorTest.java | 40 ++++++++++++++++++
.../pypaimon/globalindex/global_index_evaluator.py | 10 ++++-
.../pypaimon/tests/global_index_evaluator_test.py | 49 ++++++++++++++++++++++
4 files changed, 106 insertions(+), 2 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 64c4c9f12f..ed04313649 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -156,7 +156,7 @@ public class GlobalIndexEvaluator
List<Predicate> children = flattenChildren(predicate);
List<Future<Optional<GlobalIndexResult>>> futures = new
ArrayList<>(children.size());
for (Predicate child : children) {
- futures.add(executorService.submit(() -> child.visit(this)));
+ futures.add(executorService.submit(() ->
evaluateChildSequentially(child)));
}
List<Optional<GlobalIndexResult>> results = new
ArrayList<>(children.size());
@@ -205,6 +205,13 @@ public class GlobalIndexEvaluator
}
}
+ private Optional<GlobalIndexResult> evaluateChildSequentially(Predicate
child) {
+ if (child instanceof CompoundPredicate) {
+ return visitSequential((CompoundPredicate) child);
+ }
+ return child.visit(this);
+ }
+
private List<Predicate> flattenChildren(CompoundPredicate predicate) {
List<Predicate> result = new ArrayList<>();
for (Predicate child : predicate.children()) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
index ac90fb61c6..e7e28e16e0 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -332,6 +332,46 @@ class GlobalIndexEvaluatorTest {
evaluator.close();
}
+ @Test
+ void testMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+ GlobalIndexResult resultC = resultOf(1, 2, 3, 10, 11);
+ // Field c used for second OR child - distinct from field a/b
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+ fieldResults.put(2, resultC);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ // AND(OR(a, b), OR(a, c)) — mixed nesting, different compound types
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.or(builder.equal(0, 1),
builder.equal(1, 2)),
+ PredicateBuilder.or(builder.equal(0, 3),
builder.equal(2, 4)));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // OR(a, b) = union({1..5}, {3..7}) = {1..7}
+ // OR(a, c) = union({1..5}, {1,2,3,10,11}) = {1,2,3,4,5,10,11}
+ // AND = intersection = {1,2,3,4,5}
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L,
5L);
+ evaluator.close();
+ }
+
@Test
void testNullPredicate() {
RowType rowType = rowType();
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 8c899554ea..d8b45e9e86 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -111,8 +111,16 @@ class GlobalIndexEvaluator:
result.append(child)
return result
+ def _evaluate_child_sequentially(self, child):
+ if isinstance(child, Predicate) and child.method in ('and', 'or'):
+ if child.method == 'and':
+ return self._visit_and_sequential(child.literals)
+ else:
+ return self._visit_or_sequential(child.literals)
+ return self._visit_predicate(child)
+
def _submit_children(self, children) -> List[Future]:
- return [self._executor.submit(self._visit_predicate, child) for child
in children]
+ return [self._executor.submit(self._evaluate_child_sequentially,
child) for child in children]
def _collect_results(self, futures: List[Future]) ->
List[Optional[GlobalIndexResult]]:
return [f.result() for f in futures]
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
index b62a4fbe02..e59038849c 100644
--- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -300,6 +300,55 @@ class GlobalIndexEvaluatorTest(unittest.TestCase):
evaluator.close()
executor.shutdown(wait=False)
+ def test_mixed_nested_does_not_deadlock_with_small_pool(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(3, 7))
+ result_c = GlobalIndexResult.from_range(Range(1, 3))
+
+ field_results = {0: result_a, 1: result_b, 2: result_c}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ # AND(OR(a, b), OR(a, c)) — mixed nesting
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[3]),
+ Predicate(method='equal', index=2, field='c',
literals=[4]),
+ ],
+ ),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # OR(a, b) = union([1,5], [3,7]) = [1,7]
+ # OR(a, c) = union([1,5], [1,3]) = [1,5]
+ # AND = intersection = [1,5]
+ self.assertEqual(bm.cardinality(), 5)
+ for v in [1, 2, 3, 4, 5]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
def test_null_predicate(self):
fields = _make_fields()
evaluator = GlobalIndexEvaluator(fields, lambda field: [])