This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch btree_multi_thread
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/btree_multi_thread by this
push:
new 1de3eb2a1c [core] Make parallel evaluation field-aware to prevent
concurrent reader access
1de3eb2a1c is described below
commit 1de3eb2a1c92e13673ae77a6e0dbe25947f8c694
Author: JingsongLi <[email protected]>
AuthorDate: Mon May 25 13:36:05 2026 +0800
[core] Make parallel evaluation field-aware to prevent concurrent reader
access
Group flattened children by field before parallel submission so that
predicates referencing the same field are evaluated sequentially within
a single task. This prevents concurrent access to non-thread-safe
GlobalIndexReader instances while preserving parallelism across fields.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 58 ++++++++++++++++++++--
.../globalindex/GlobalIndexEvaluatorTest.java | 42 ++++++++++++++++
.../pypaimon/globalindex/global_index_evaluator.py | 48 ++++++++++++++++--
.../pypaimon/tests/global_index_evaluator_test.py | 52 +++++++++++++++++++
4 files changed, 193 insertions(+), 7 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 7d86a5c58e..06431ab229 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -32,6 +32,7 @@ import javax.annotation.Nullable;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -154,9 +155,11 @@ public class GlobalIndexEvaluator
private Optional<GlobalIndexResult> visitParallel(CompoundPredicate
predicate) {
List<Predicate> children = flattenChildren(predicate);
- List<Future<Optional<GlobalIndexResult>>> futures = new
ArrayList<>(children.size());
- for (Predicate child : children) {
- futures.add(executorService.submit(() ->
evaluateWithoutParallel(child)));
+ List<List<Predicate>> groups = groupByField(children);
+ List<Future<Optional<GlobalIndexResult>>> futures = new
ArrayList<>(groups.size());
+ for (List<Predicate> group : groups) {
+ futures.add(
+ executorService.submit(() ->
evaluateGroupWithoutParallel(group, predicate)));
}
List<Optional<GlobalIndexResult>> results = new
ArrayList<>(children.size());
@@ -205,6 +208,55 @@ public class GlobalIndexEvaluator
}
}
+ private List<List<Predicate>> groupByField(List<Predicate> children) {
+ Map<String, List<Predicate>> fieldGroups = new HashMap<>();
+ List<List<Predicate>> result = new ArrayList<>();
+ for (Predicate child : children) {
+ if (child instanceof LeafPredicate) {
+ String fieldName = ((LeafPredicate) child).fieldName();
+ fieldGroups.computeIfAbsent(fieldName, k -> new
ArrayList<>()).add(child);
+ } else {
+ result.add(Collections.singletonList(child));
+ }
+ }
+ result.addAll(fieldGroups.values());
+ return result;
+ }
+
+ private Optional<GlobalIndexResult> evaluateGroupWithoutParallel(
+ List<Predicate> group, CompoundPredicate parent) {
+ if (group.size() == 1) {
+ return evaluateWithoutParallel(group.get(0));
+ }
+ if (parent.function() instanceof Or) {
+ GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
+ for (Predicate child : group) {
+ Optional<GlobalIndexResult> childResult =
evaluateWithoutParallel(child);
+ if (!childResult.isPresent()) {
+ return Optional.empty();
+ }
+ compoundResult = compoundResult.or(childResult.get());
+ }
+ return Optional.of(compoundResult);
+ } else {
+ Optional<GlobalIndexResult> compoundResult = Optional.empty();
+ for (Predicate child : group) {
+ Optional<GlobalIndexResult> childResult =
evaluateWithoutParallel(child);
+ if (childResult.isPresent()) {
+ if (compoundResult.isPresent()) {
+ compoundResult =
Optional.of(compoundResult.get().and(childResult.get()));
+ } else {
+ compoundResult = childResult;
+ }
+ }
+ if (compoundResult.isPresent() &&
compoundResult.get().results().isEmpty()) {
+ return compoundResult;
+ }
+ }
+ return compoundResult;
+ }
+ }
+
private Optional<GlobalIndexResult> evaluateWithoutParallel(Predicate
predicate) {
if (predicate instanceof LeafPredicate) {
return visit((LeafPredicate) predicate);
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
index 10a2577b94..822145ca8e 100644
---
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -415,6 +415,48 @@ class GlobalIndexEvaluatorTest {
evaluator.close();
}
+ @Test
+ void testSameFieldPredicatesNotAccessedConcurrently() {
+ executor = Executors.newFixedThreadPool(4);
+ RowType rowType = rowType();
+
+ AtomicInteger concurrency = new AtomicInteger(0);
+ AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+ GlobalIndexReader concurrencyDetectingReader =
+ new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(
+ FieldRef fieldRef, Object literal) {
+ int c = concurrency.incrementAndGet();
+ maxConcurrency.updateAndGet(cur -> Math.max(cur, c));
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ concurrency.decrementAndGet();
+ return super.visitEqual(fieldRef, literal);
+ }
+ };
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
Collections.singletonList(concurrencyDetectingReader),
+ executor);
+
+ // AND(a=1, a=2, a=3) — all same field, must not run concurrently
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(builder.equal(0, 1), builder.equal(0, 2),
builder.equal(0, 3));
+
+ evaluator.evaluate(predicate);
+
+ assertThat(maxConcurrency.get()).isEqualTo(1);
+ evaluator.close();
+ }
+
@Test
void testNullPredicate() {
RowType rowType = rowType();
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index 5ca980e218..9f81487bec 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -135,15 +135,54 @@ class GlobalIndexEvaluator:
compound_result = compound_result.or_(child_result)
return compound_result
- def _submit_children(self, children) -> List[Future]:
- return [self._executor.submit(self._evaluate_without_parallel, child)
for child in children]
+ def _group_by_field(self, children) -> list:
+ field_groups = {}
+ result = []
+ for child in children:
+ if isinstance(child, Predicate) and child.method not in ('and',
'or'):
+ field_name = child.field
+ if field_name not in field_groups:
+ field_groups[field_name] = []
+ field_groups[field_name].append(child)
+ else:
+ result.append([child])
+ result.extend(field_groups.values())
+ return result
+
+ def _evaluate_group_without_parallel(self, group, method) ->
Optional[GlobalIndexResult]:
+ if len(group) == 1:
+ return self._evaluate_without_parallel(group[0])
+ if method == 'or':
+ compound_result = GlobalIndexResult.create_empty()
+ for child in group:
+ child_result = self._evaluate_without_parallel(child)
+ if child_result is None:
+ return None
+ compound_result = compound_result.or_(child_result)
+ return compound_result
+ else:
+ compound_result = None
+ for child in group:
+ child_result = self._evaluate_without_parallel(child)
+ if child_result is not None:
+ if compound_result is not None:
+ compound_result = compound_result.and_(child_result)
+ else:
+ compound_result = child_result
+ if compound_result is not None and compound_result.is_empty():
+ return compound_result
+ return compound_result
+
+ def _submit_groups(self, groups, method) -> List[Future]:
+ return [self._executor.submit(self._evaluate_group_without_parallel,
group, method) for group in groups]
def _collect_results(self, futures: List[Future]) ->
List[Optional[GlobalIndexResult]]:
return [f.result() for f in futures]
def _visit_and_parallel(self, children) -> Optional[GlobalIndexResult]:
children = self._flatten_children('and', children)
- results = self._collect_results(self._submit_children(children))
+ groups = self._group_by_field(children)
+ results = self._collect_results(self._submit_groups(groups, 'and'))
compound_result: Optional[GlobalIndexResult] = None
for child_result in results:
@@ -160,7 +199,8 @@ class GlobalIndexEvaluator:
def _visit_or_parallel(self, children) -> Optional[GlobalIndexResult]:
children = self._flatten_children('or', children)
- results = self._collect_results(self._submit_children(children))
+ groups = self._group_by_field(children)
+ results = self._collect_results(self._submit_groups(groups, 'or'))
compound_result = GlobalIndexResult.create_empty()
for child_result in results:
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
index 5dfc39f0cc..f5ae519360 100644
--- a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -410,6 +410,58 @@ class GlobalIndexEvaluatorTest(unittest.TestCase):
evaluator.close()
executor.shutdown(wait=False)
+ def test_same_field_predicates_not_accessed_concurrently(self):
+ import threading
+ import time
+
+ fields = _make_fields()
+
+ concurrency = [0]
+ max_concurrency = [0]
+ lock = threading.Lock()
+
+ class ConcurrencyDetectingReader(GlobalIndexReader):
+ def __init__(self, result):
+ self._result = result
+
+ def visit_equal(self, field_ref, literal):
+ with lock:
+ concurrency[0] += 1
+ max_concurrency[0] = max(max_concurrency[0],
concurrency[0])
+ time.sleep(0.05)
+ with lock:
+ concurrency[0] -= 1
+ return self._result
+
+ def close(self):
+ pass
+
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ reader = ConcurrencyDetectingReader(result_a)
+
+ executor = ThreadPoolExecutor(max_workers=4)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [reader],
+ executor,
+ )
+
+ # AND(a=1, a=2, a=3) — all same field, must not run concurrently
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[1]),
+ Predicate(method='equal', index=0, field='a', literals=[2]),
+ Predicate(method='equal', index=0, field='a', literals=[3]),
+ ],
+ )
+
+ evaluator.evaluate(predicate)
+
+ self.assertEqual(max_concurrency[0], 1)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
def test_null_predicate(self):
fields = _make_fields()
evaluator = GlobalIndexEvaluator(fields, lambda field: [])