This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch btree_multi_thread
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/btree_multi_thread by this 
push:
     new 1de3eb2a1c [core] Make parallel evaluation field-aware to prevent 
concurrent reader access
1de3eb2a1c is described below

commit 1de3eb2a1c92e13673ae77a6e0dbe25947f8c694
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 13:36:05 2026 +0800

    [core] Make parallel evaluation field-aware to prevent concurrent reader 
access
    
    Group flattened children by field before parallel submission so that
    predicates referencing the same field are evaluated sequentially within
    a single task. This prevents concurrent access to non-thread-safe
    GlobalIndexReader instances while preserving parallelism across fields.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
 .../paimon/globalindex/GlobalIndexEvaluator.java   | 58 ++++++++++++++++++++--
 .../globalindex/GlobalIndexEvaluatorTest.java      | 42 ++++++++++++++++
 .../pypaimon/globalindex/global_index_evaluator.py | 48 ++++++++++++++++--
 .../pypaimon/tests/global_index_evaluator_test.py  | 52 +++++++++++++++++++
 4 files changed, 193 insertions(+), 7 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 7d86a5c58e..06431ab229 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
 import java.io.Closeable;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -154,9 +155,11 @@ public class GlobalIndexEvaluator
 
     private Optional<GlobalIndexResult> visitParallel(CompoundPredicate 
predicate) {
         List<Predicate> children = flattenChildren(predicate);
-        List<Future<Optional<GlobalIndexResult>>> futures = new 
ArrayList<>(children.size());
-        for (Predicate child : children) {
-            futures.add(executorService.submit(() -> 
evaluateWithoutParallel(child)));
+        List<List<Predicate>> groups = groupByField(children);
+        List<Future<Optional<GlobalIndexResult>>> futures = new 
ArrayList<>(groups.size());
+        for (List<Predicate> group : groups) {
+            futures.add(
+                    executorService.submit(() -> 
evaluateGroupWithoutParallel(group, predicate)));
         }
 
         List<Optional<GlobalIndexResult>> results = new 
ArrayList<>(children.size());
@@ -205,6 +208,55 @@ public class GlobalIndexEvaluator
         }
     }
 
+    private List<List<Predicate>> groupByField(List<Predicate> children) {
+        Map<String, List<Predicate>> fieldGroups = new HashMap<>();
+        List<List<Predicate>> result = new ArrayList<>();
+        for (Predicate child : children) {
+            if (child instanceof LeafPredicate) {
+                String fieldName = ((LeafPredicate) child).fieldName();
+                fieldGroups.computeIfAbsent(fieldName, k -> new 
ArrayList<>()).add(child);
+            } else {
+                result.add(Collections.singletonList(child));
+            }
+        }
+        result.addAll(fieldGroups.values());
+        return result;
+    }
+
+    private Optional<GlobalIndexResult> evaluateGroupWithoutParallel(
+            List<Predicate> group, CompoundPredicate parent) {
+        if (group.size() == 1) {
+            return evaluateWithoutParallel(group.get(0));
+        }
+        if (parent.function() instanceof Or) {
+            GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
+            for (Predicate child : group) {
+                Optional<GlobalIndexResult> childResult = 
evaluateWithoutParallel(child);
+                if (!childResult.isPresent()) {
+                    return Optional.empty();
+                }
+                compoundResult = compoundResult.or(childResult.get());
+            }
+            return Optional.of(compoundResult);
+        } else {
+            Optional<GlobalIndexResult> compoundResult = Optional.empty();
+            for (Predicate child : group) {
+                Optional<GlobalIndexResult> childResult = 
evaluateWithoutParallel(child);
+                if (childResult.isPresent()) {
+                    if (compoundResult.isPresent()) {
+                        compoundResult = 
Optional.of(compoundResult.get().and(childResult.get()));
+                    } else {
+                        compoundResult = childResult;
+                    }
+                }
+                if (compoundResult.isPresent() && 
compoundResult.get().results().isEmpty()) {
+                    return compoundResult;
+                }
+            }
+            return compoundResult;
+        }
+    }
+
     private Optional<GlobalIndexResult> evaluateWithoutParallel(Predicate 
predicate) {
         if (predicate instanceof LeafPredicate) {
             return visit((LeafPredicate) predicate);
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
index 10a2577b94..822145ca8e 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -415,6 +415,48 @@ class GlobalIndexEvaluatorTest {
         evaluator.close();
     }
 
+    @Test
+    void testSameFieldPredicatesNotAccessedConcurrently() {
+        executor = Executors.newFixedThreadPool(4);
+        RowType rowType = rowType();
+
+        AtomicInteger concurrency = new AtomicInteger(0);
+        AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+        GlobalIndexReader concurrencyDetectingReader =
+                new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+                    @Override
+                    public Optional<GlobalIndexResult> visitEqual(
+                            FieldRef fieldRef, Object literal) {
+                        int c = concurrency.incrementAndGet();
+                        maxConcurrency.updateAndGet(cur -> Math.max(cur, c));
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                        concurrency.decrementAndGet();
+                        return super.visitEqual(fieldRef, literal);
+                    }
+                };
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> 
Collections.singletonList(concurrencyDetectingReader),
+                        executor);
+
+        // AND(a=1, a=2, a=3) — all same field, must not run concurrently
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(builder.equal(0, 1), builder.equal(0, 2), 
builder.equal(0, 3));
+
+        evaluator.evaluate(predicate);
+
+        assertThat(maxConcurrency.get()).isEqualTo(1);
+        evaluator.close();
+    }
+
     @Test
     void testNullPredicate() {
         RowType rowType = rowType();
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 5ca980e218..9f81487bec 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -135,15 +135,54 @@ class GlobalIndexEvaluator:
                 compound_result = compound_result.or_(child_result)
             return compound_result
 
-    def _submit_children(self, children) -> List[Future]:
-        return [self._executor.submit(self._evaluate_without_parallel, child) 
for child in children]
+    def _group_by_field(self, children) -> list:
+        field_groups = {}
+        result = []
+        for child in children:
+            if isinstance(child, Predicate) and child.method not in ('and', 
'or'):
+                field_name = child.field
+                if field_name not in field_groups:
+                    field_groups[field_name] = []
+                field_groups[field_name].append(child)
+            else:
+                result.append([child])
+        result.extend(field_groups.values())
+        return result
+
+    def _evaluate_group_without_parallel(self, group, method) -> 
Optional[GlobalIndexResult]:
+        if len(group) == 1:
+            return self._evaluate_without_parallel(group[0])
+        if method == 'or':
+            compound_result = GlobalIndexResult.create_empty()
+            for child in group:
+                child_result = self._evaluate_without_parallel(child)
+                if child_result is None:
+                    return None
+                compound_result = compound_result.or_(child_result)
+            return compound_result
+        else:
+            compound_result = None
+            for child in group:
+                child_result = self._evaluate_without_parallel(child)
+                if child_result is not None:
+                    if compound_result is not None:
+                        compound_result = compound_result.and_(child_result)
+                    else:
+                        compound_result = child_result
+                if compound_result is not None and compound_result.is_empty():
+                    return compound_result
+            return compound_result
+
+    def _submit_groups(self, groups, method) -> List[Future]:
+        return [self._executor.submit(self._evaluate_group_without_parallel, 
group, method) for group in groups]
 
     def _collect_results(self, futures: List[Future]) -> 
List[Optional[GlobalIndexResult]]:
         return [f.result() for f in futures]
 
     def _visit_and_parallel(self, children) -> Optional[GlobalIndexResult]:
         children = self._flatten_children('and', children)
-        results = self._collect_results(self._submit_children(children))
+        groups = self._group_by_field(children)
+        results = self._collect_results(self._submit_groups(groups, 'and'))
 
         compound_result: Optional[GlobalIndexResult] = None
         for child_result in results:
@@ -160,7 +199,8 @@ class GlobalIndexEvaluator:
 
     def _visit_or_parallel(self, children) -> Optional[GlobalIndexResult]:
         children = self._flatten_children('or', children)
-        results = self._collect_results(self._submit_children(children))
+        groups = self._group_by_field(children)
+        results = self._collect_results(self._submit_groups(groups, 'or'))
 
         compound_result = GlobalIndexResult.create_empty()
         for child_result in results:
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py 
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
index 5dfc39f0cc..f5ae519360 100644
--- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -410,6 +410,58 @@ class GlobalIndexEvaluatorTest(unittest.TestCase):
         evaluator.close()
         executor.shutdown(wait=False)
 
+    def test_same_field_predicates_not_accessed_concurrently(self):
+        import threading
+        import time
+
+        fields = _make_fields()
+
+        concurrency = [0]
+        max_concurrency = [0]
+        lock = threading.Lock()
+
+        class ConcurrencyDetectingReader(GlobalIndexReader):
+            def __init__(self, result):
+                self._result = result
+
+            def visit_equal(self, field_ref, literal):
+                with lock:
+                    concurrency[0] += 1
+                    max_concurrency[0] = max(max_concurrency[0], 
concurrency[0])
+                time.sleep(0.05)
+                with lock:
+                    concurrency[0] -= 1
+                return self._result
+
+            def close(self):
+                pass
+
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        reader = ConcurrencyDetectingReader(result_a)
+
+        executor = ThreadPoolExecutor(max_workers=4)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [reader],
+            executor,
+        )
+
+        # AND(a=1, a=2, a=3) — all same field, must not run concurrently
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[1]),
+                Predicate(method='equal', index=0, field='a', literals=[2]),
+                Predicate(method='equal', index=0, field='a', literals=[3]),
+            ],
+        )
+
+        evaluator.evaluate(predicate)
+
+        self.assertEqual(max_concurrency[0], 1)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
     def test_null_predicate(self):
         fields = _make_fields()
         evaluator = GlobalIndexEvaluator(fields, lambda field: [])

Reply via email to