This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b26aa9f9e6 [core] Parallelize GlobalIndexEvaluator across multiple 
BTree fields (#7951)
b26aa9f9e6 is described below

commit b26aa9f9e6a5aa16590a4b458d824181dd75f0b7
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 25 19:01:06 2026 +0800

    [core] Parallelize GlobalIndexEvaluator across multiple BTree fields (#7951)
    
    When a compound predicate (AND/OR) involves multiple fields, each
    field's BTree index lookup is now submitted to a thread pool and
    executed in parallel. This reduces scan latency for multi-field global
    index queries.
    
    Key changes:
    - GlobalIndexEvaluator accepts an optional ExecutorService for parallel
    child predicate evaluation in visitParallel, with sequential fallback.
    - GlobalIndexScanner creates an isolated evaluator thread pool to avoid
    deadlocks with the existing UnionGlobalIndexReader executor.
    - Python side uses ThreadPoolExecutor with threading.Lock for
    thread-safe reader cache access.
    - Added GlobalIndexEvaluatorTest for both Java and Python.
---
 .../paimon/globalindex/GlobalIndexEvaluator.java   | 170 +++--
 .../paimon/globalindex/UnionGlobalIndexReader.java |  28 +-
 .../globalindex/GlobalIndexEvaluatorTest.java      | 736 +++++++++++++++++++++
 .../paimon/globalindex/GlobalIndexScanner.java     |  13 +-
 .../pypaimon/globalindex/global_index_evaluator.py | 218 ++++--
 .../pypaimon/globalindex/global_index_scanner.py   |  21 +-
 .../pypaimon/tests/global_index_evaluator_test.py  | 660 ++++++++++++++++++
 7 files changed, 1705 insertions(+), 141 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 720cf63fe4..2f07d97fca 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -23,80 +23,148 @@ import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.Deque;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 /** Predicate for filtering data using global indexes. */
-public class GlobalIndexEvaluator
-        implements Closeable, PredicateVisitor<Optional<GlobalIndexResult>> {
+public class GlobalIndexEvaluator implements Closeable {
 
     private final RowType rowType;
     private final IntFunction<Collection<GlobalIndexReader>> readersFunction;
-    private final Map<Integer, Collection<GlobalIndexReader>> 
indexReadersCache = new HashMap<>();
+    private final Map<Integer, Collection<GlobalIndexReader>> 
indexReadersCache;
+    private final ExecutorService executorService;
 
     public GlobalIndexEvaluator(
-            RowType rowType, IntFunction<Collection<GlobalIndexReader>> 
readersFunction) {
+            RowType rowType,
+            IntFunction<Collection<GlobalIndexReader>> readersFunction,
+            @Nullable ExecutorService executorService) {
         this.rowType = rowType;
         this.readersFunction = readersFunction;
+        this.executorService =
+                executorService == null ? newDirectExecutorService() : 
executorService;
+        this.indexReadersCache = new ConcurrentHashMap<>();
     }
 
     public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate) 
{
-        return predicate == null ? Optional.empty() : predicate.visit(this);
+        if (predicate == null) {
+            return Optional.empty();
+        }
+        try {
+            return visitAsync(predicate).get();
+        } catch (InterruptedException e) {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException("Interrupted during index evaluation", 
e);
+        } catch (ExecutionException e) {
+            if (e.getCause() instanceof RuntimeException) {
+                throw (RuntimeException) e.getCause();
+            }
+            if (e.getCause() instanceof Error) {
+                throw (Error) e.getCause();
+            }
+            throw new RuntimeException(e.getCause());
+        }
     }
 
-    @Override
-    public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+    private CompletableFuture<Optional<GlobalIndexResult>> 
visitAsync(Predicate predicate) {
+        if (predicate instanceof LeafPredicate) {
+            return visitLeafAsync((LeafPredicate) predicate);
+        }
+        return visitCompoundAsync((CompoundPredicate) predicate);
+    }
+
+    private CompletableFuture<Optional<GlobalIndexResult>> 
visitLeafAsync(LeafPredicate predicate) {
         Optional<FieldRef> fieldRefOptional = predicate.fieldRefOptional();
         if (!fieldRefOptional.isPresent()) {
-            return Optional.empty();
+            return CompletableFuture.completedFuture(Optional.empty());
         }
-        Optional<GlobalIndexResult> compoundResult = Optional.empty();
         FieldRef fieldRef = fieldRefOptional.get();
         int fieldId = rowType.getField(fieldRef.name()).id();
         Collection<GlobalIndexReader> readers =
                 indexReadersCache.computeIfAbsent(fieldId, 
readersFunction::apply);
-        for (GlobalIndexReader fileIndexReader : readers) {
-            Optional<GlobalIndexResult> childResult =
-                    predicate.function().visit(fileIndexReader, fieldRef, 
predicate.literals());
-            if (!childResult.isPresent()) {
-                continue;
-            }
 
-            GlobalIndexResult result = childResult.get();
+        List<CompletableFuture<Optional<GlobalIndexResult>>> readerFutures =
+                new ArrayList<>(readers.size());
+        for (GlobalIndexReader reader : readers) {
+            readerFutures.add(
+                    CompletableFuture.supplyAsync(
+                            () -> {
+                                synchronized (reader) {
+                                    Optional<GlobalIndexResult> result =
+                                            predicate
+                                                    .function()
+                                                    .visit(reader, fieldRef, 
predicate.literals());
+                                    
result.ifPresent(GlobalIndexResult::results);
+                                    return result;
+                                }
+                            },
+                            executorService));
+        }
 
-            // AND Operation
-            if (compoundResult.isPresent()) {
-                GlobalIndexResult r1 = compoundResult.get();
-                compoundResult = Optional.of(r1.and(result));
-            } else {
-                compoundResult = Optional.of(result);
-            }
+        return CompletableFuture.allOf(readerFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(
+                        v -> {
+                            Optional<GlobalIndexResult> compoundResult = 
Optional.empty();
+                            for 
(CompletableFuture<Optional<GlobalIndexResult>> f : readerFutures) {
+                                Optional<GlobalIndexResult> childResult = 
f.join();
+                                if (!childResult.isPresent()) {
+                                    continue;
+                                }
+                                if (compoundResult.isPresent()) {
+                                    compoundResult =
+                                            Optional.of(
+                                                    
compoundResult.get().and(childResult.get()));
+                                } else {
+                                    compoundResult = childResult;
+                                }
+                                if (compoundResult.get().results().isEmpty()) {
+                                    return compoundResult;
+                                }
+                            }
+                            return compoundResult;
+                        });
+    }
 
-            if (compoundResult.get().results().isEmpty()) {
-                return compoundResult;
-            }
-        }
-        return compoundResult;
+    private CompletableFuture<Optional<GlobalIndexResult>> visitCompoundAsync(
+            CompoundPredicate predicate) {
+        List<Predicate> children = flattenChildren(predicate);
+        List<CompletableFuture<Optional<GlobalIndexResult>>> childFutures =
+                
children.stream().map(this::visitAsync).collect(Collectors.toList());
+
+        return CompletableFuture.allOf(childFutures.toArray(new 
CompletableFuture[0]))
+                .thenApply(
+                        v -> {
+                            List<Optional<GlobalIndexResult>> results = new 
ArrayList<>();
+                            for 
(CompletableFuture<Optional<GlobalIndexResult>> f : childFutures) {
+                                results.add(f.join());
+                            }
+                            return combineResults(results, predicate);
+                        });
     }
 
-    @Override
-    public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
+    private Optional<GlobalIndexResult> combineResults(
+            List<Optional<GlobalIndexResult>> results, CompoundPredicate 
predicate) {
         if (predicate.function() instanceof Or) {
             GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
-            for (Predicate predicate1 : predicate.children()) {
-                Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
-
+            for (Optional<GlobalIndexResult> childResult : results) {
                 if (!childResult.isPresent()) {
                     return Optional.empty();
                 }
@@ -105,21 +173,14 @@ public class GlobalIndexEvaluator
             return Optional.of(compoundResult);
         } else {
             Optional<GlobalIndexResult> compoundResult = Optional.empty();
-            for (Predicate predicate1 : predicate.children()) {
-                Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
-
-                // AND Operation
+            for (Optional<GlobalIndexResult> childResult : results) {
                 if (childResult.isPresent()) {
                     if (compoundResult.isPresent()) {
-                        GlobalIndexResult r1 = compoundResult.get();
-                        GlobalIndexResult r2 = childResult.get();
-                        compoundResult = Optional.of(r1.and(r2));
+                        compoundResult = 
Optional.of(compoundResult.get().and(childResult.get()));
                     } else {
                         compoundResult = childResult;
                     }
                 }
-
-                // if not remain, no need to test anymore
                 if (compoundResult.isPresent() && 
compoundResult.get().results().isEmpty()) {
                     return compoundResult;
                 }
@@ -128,6 +189,27 @@ public class GlobalIndexEvaluator
         }
     }
 
+    private List<Predicate> flattenChildren(CompoundPredicate predicate) {
+        List<Predicate> result = new ArrayList<>();
+        Deque<Predicate> stack = new ArrayDeque<>(predicate.children());
+        while (!stack.isEmpty()) {
+            Predicate child = stack.pollFirst();
+            if (child instanceof CompoundPredicate) {
+                CompoundPredicate compound = (CompoundPredicate) child;
+                if (compound.function().equals(predicate.function())) {
+                    List<Predicate> grandChildren = compound.children();
+                    for (int i = grandChildren.size() - 1; i >= 0; i--) {
+                        stack.addFirst(grandChildren.get(i));
+                    }
+                    continue;
+                }
+            }
+            result.add(child);
+        }
+        return result;
+    }
+
+    @Override
     public void close() {
         IOUtils.closeAllQuietly(
                 indexReadersCache.values().stream()
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index b585a75570..23580b4171 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -21,20 +21,12 @@ package org.apache.paimon.globalindex;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.VectorSearch;
 
-import javax.annotation.Nullable;
-
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
-import java.util.concurrent.ExecutorService;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
-import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
-
 /**
  * A {@link GlobalIndexReader} that combines results from multiple readers by 
performing a union
  * (OR) operation on their results.
@@ -42,16 +34,9 @@ import static 
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialR
 public class UnionGlobalIndexReader implements GlobalIndexReader {
 
     private final List<GlobalIndexReader> readers;
-    private final @Nullable ExecutorService executor;
 
     public UnionGlobalIndexReader(List<GlobalIndexReader> readers) {
-        this(readers, null);
-    }
-
-    public UnionGlobalIndexReader(
-            List<GlobalIndexReader> readers, @Nullable ExecutorService 
executor) {
         this.readers = readers;
-        this.executor = executor;
     }
 
     @Override
@@ -163,18 +148,7 @@ public class UnionGlobalIndexReader implements 
GlobalIndexReader {
     }
 
     private <R> List<R> executeAllReaders(Function<GlobalIndexReader, R> 
function) {
-        if (executor == null) {
-            return readers.stream().map(function).collect(Collectors.toList());
-        }
-
-        Iterator<R> iterator =
-                randomlyExecuteSequentialReturn(
-                        executor, reader -> 
singletonList(function.apply(reader)), readers);
-        List<R> result = new ArrayList<>();
-        while (iterator.hasNext()) {
-            result.add(iterator.next());
-        }
-        return result;
+        return readers.stream().map(function).collect(Collectors.toList());
     }
 
     @Override
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
new file mode 100644
index 0000000000..0eea0bb02b
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link GlobalIndexEvaluator}. */
+class GlobalIndexEvaluatorTest {
+
+    private ExecutorService executor;
+
+    @AfterEach
+    void tearDown() {
+        if (executor != null) {
+            executor.shutdownNow();
+        }
+    }
+
+    private static RowType rowType() {
+        return new RowType(
+                Arrays.asList(
+                        new DataField(0, "a", DataTypes.INT()),
+                        new DataField(1, "b", DataTypes.INT()),
+                        new DataField(2, "c", DataTypes.INT())));
+    }
+
+    private static GlobalIndexResult resultOf(long... rowIds) {
+        return GlobalIndexResult.create(
+                () -> {
+                    RoaringNavigableMap64 bm = new RoaringNavigableMap64();
+                    for (long id : rowIds) {
+                        bm.add(id);
+                    }
+                    return bm;
+                });
+    }
+
+    private static GlobalIndexReader readerReturning(GlobalIndexResult result) 
{
+        return new StubGlobalIndexReader(result);
+    }
+
+    @Test
+    void testSingleFieldSequential() {
+        RowType rowType = rowType();
+        GlobalIndexResult expected = resultOf(1, 2, 3);
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> 
Collections.singletonList(readerReturning(expected)),
+                        null);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = builder.equal(0, 42);
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L);
+        evaluator.close();
+    }
+
+    @Test
+    void testAndParallelMultipleFields() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = PredicateBuilder.and(builder.equal(0, 42), 
builder.equal(1, 99));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertBitmapContainsExactly(result.get().results(), 3L, 4L, 5L);
+        evaluator.close();
+    }
+
+    @Test
+    void testOrParallelMultipleFields() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2);
+        GlobalIndexResult resultB = resultOf(3, 4);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = PredicateBuilder.or(builder.equal(0, 42), 
builder.equal(1, 99));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L);
+        evaluator.close();
+    }
+
+    @Test
+    void testOrReturnsEmptyWhenChildUnsupported() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> {
+                            if (fieldId == 0) {
+                                return 
Collections.singletonList(readerReturning(resultA));
+                            }
+                            return Collections.emptyList();
+                        },
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = PredicateBuilder.or(builder.equal(0, 42), 
builder.equal(1, 99));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isEmpty();
+        evaluator.close();
+    }
+
+    @Test
+    void testAndWithEmptyResultShortCircuits() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3);
+        GlobalIndexResult resultB = resultOf(10, 11);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = PredicateBuilder.and(builder.equal(0, 42), 
builder.equal(1, 99));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertThat(result.get().results().isEmpty()).isTrue();
+        evaluator.close();
+    }
+
+    @Test
+    void testParallelUsesMultipleThreads() {
+        executor = Executors.newFixedThreadPool(3);
+        RowType rowType = rowType();
+
+        ConcurrentHashMap<String, Boolean> threadNames = new 
ConcurrentHashMap<>();
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        new 
StubGlobalIndexReader(resultOf(fieldId, fieldId + 10)) {
+                                            @Override
+                                            public Optional<GlobalIndexResult> 
visitEqual(
+                                                    FieldRef fieldRef, Object 
literal) {
+                                                threadNames.put(
+                                                        
Thread.currentThread().getName(), true);
+                                                return 
super.visitEqual(fieldRef, literal);
+                                            }
+                                        }),
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2), 
builder.equal(2, 3));
+
+        evaluator.evaluate(predicate);
+
+        assertThat(threadNames.size()).isGreaterThan(1);
+        evaluator.close();
+    }
+
+    @Test
+    void testNullExecutorFallsBackToSequential() {
+        RowType rowType = rowType();
+
+        AtomicInteger callCount = new AtomicInteger();
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> {
+                            callCount.incrementAndGet();
+                            return Collections.singletonList(
+                                    readerReturning(resultOf(fieldId, fieldId 
+ 10)));
+                        },
+                        null);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = PredicateBuilder.and(builder.equal(0, 1), 
builder.equal(1, 2));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertThat(callCount.get()).isEqualTo(2);
+        evaluator.close();
+    }
+
+    @Test
+    void testNestedAndPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+        GlobalIndexResult resultC = resultOf(4, 5, 8, 9);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // and(a, b, c) builds as and(and(a, b), c) — nested binary tree
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2), 
builder.equal(2, 3));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // intersection of {1..5}, {3..7}, {4,5,8,9} -> {4,5}
+        assertBitmapContainsExactly(result.get().results(), 4L, 5L);
+        evaluator.close();
+    }
+
+    @Test
+    void testNestedOrPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2);
+        GlobalIndexResult resultB = resultOf(3, 4);
+        GlobalIndexResult resultC = resultOf(5, 6);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // or(a, b, c) builds as or(or(a, b), c) — nested binary tree
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.or(builder.equal(0, 1), builder.equal(1, 2), 
builder.equal(2, 3));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // union of {1,2}, {3,4}, {5,6}
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L, 
5L, 6L);
+        evaluator.close();
+    }
+
+    @Test
+    void testMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+        GlobalIndexResult resultC = resultOf(1, 2, 3, 10, 11);
+        // Field c used for second OR child - distinct from field a/b
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // AND(OR(a, b), OR(a, c)) — mixed nesting, different compound types
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(builder.equal(0, 1), 
builder.equal(1, 2)),
+                        PredicateBuilder.or(builder.equal(0, 3), 
builder.equal(2, 4)));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // OR(a, b) = union({1..5}, {3..7}) = {1..7}
+        // OR(a, c) = union({1..5}, {1,2,3,10,11}) = {1,2,3,4,5,10,11}
+        // AND = intersection = {1,2,3,4,5}
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L, 
5L);
+        evaluator.close();
+    }
+
+    @Test
+    void testDeepMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult resultB = resultOf(2, 3, 4, 5, 6);
+        GlobalIndexResult resultC = resultOf(3, 4, 5, 6, 7);
+
+        ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new 
ConcurrentHashMap<>();
+        fieldResults.put(0, resultA);
+        fieldResults.put(1, resultB);
+        fieldResults.put(2, resultC);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Collections.singletonList(
+                                        
readerReturning(fieldResults.get(fieldId))),
+                        executor);
+
+        // AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(builder.equal(0, 1), 
builder.equal(1, 2)),
+                                builder.equal(2, 3)),
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(builder.equal(0, 4), 
builder.equal(2, 5)),
+                                builder.equal(1, 6)));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // OR(AND(a,b), c): AND(a,b)={2,3,4,5}, c={3..7} => union={2,3,4,5,6,7}
+        // OR(AND(a,c), b): AND(a,c)={3,4,5}, b={2..6} => union={2,3,4,5,6}
+        // top AND: intersection = {2,3,4,5,6}
+        assertBitmapContainsExactly(result.get().results(), 2L, 3L, 4L, 5L, 
6L);
+        evaluator.close();
+    }
+
+    @Test
+    void testSameFieldPredicatesNotAccessedConcurrently() {
+        executor = Executors.newFixedThreadPool(4);
+        RowType rowType = rowType();
+
+        AtomicInteger concurrency = new AtomicInteger(0);
+        AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+        GlobalIndexReader concurrencyDetectingReader =
+                new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+                    @Override
+                    public Optional<GlobalIndexResult> visitEqual(
+                            FieldRef fieldRef, Object literal) {
+                        int c = concurrency.incrementAndGet();
+                        maxConcurrency.updateAndGet(cur -> Math.max(cur, c));
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                        concurrency.decrementAndGet();
+                        return super.visitEqual(fieldRef, literal);
+                    }
+                };
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> 
Collections.singletonList(concurrencyDetectingReader),
+                        executor);
+
+        // AND(a=1, a=2, a=3) — all same field, must not run concurrently
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(builder.equal(0, 1), builder.equal(0, 2), 
builder.equal(0, 3));
+
+        evaluator.evaluate(predicate);
+
+        assertThat(maxConcurrency.get()).isEqualTo(1);
+        evaluator.close();
+    }
+
+    @Test
+    void testMixedNestedSameFieldNotAccessedConcurrently() {
+        executor = Executors.newFixedThreadPool(4);
+        RowType rowType = rowType();
+
+        AtomicInteger concurrencyA = new AtomicInteger(0);
+        AtomicInteger maxConcurrencyA = new AtomicInteger(0);
+
+        GlobalIndexReader concurrencyDetectingReaderA =
+                new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+                    @Override
+                    public Optional<GlobalIndexResult> visitEqual(
+                            FieldRef fieldRef, Object literal) {
+                        int c = concurrencyA.incrementAndGet();
+                        maxConcurrencyA.updateAndGet(cur -> Math.max(cur, c));
+                        try {
+                            Thread.sleep(50);
+                        } catch (InterruptedException e) {
+                            Thread.currentThread().interrupt();
+                        }
+                        concurrencyA.decrementAndGet();
+                        return super.visitEqual(fieldRef, literal);
+                    }
+                };
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> {
+                            if (fieldId == 0) {
+                                return 
Collections.singletonList(concurrencyDetectingReaderA);
+                            }
+                            return Collections.singletonList(
+                                    readerReturning(resultOf(1, 2, 3, 4, 5)));
+                        },
+                        executor);
+
+        // AND(OR(a=1, b=2), OR(a=3, c=4)) — field a appears in both OR 
subtrees
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(builder.equal(0, 1), 
builder.equal(1, 2)),
+                        PredicateBuilder.or(builder.equal(0, 3), 
builder.equal(2, 4)));
+
+        evaluator.evaluate(predicate);
+
+        assertThat(maxConcurrencyA.get()).isEqualTo(1);
+        evaluator.close();
+    }
+
+    @Test
+    void testLazyResultNotMaterializedConcurrently() {
+        executor = Executors.newFixedThreadPool(4);
+        RowType rowType = rowType();
+
+        AtomicInteger concurrency = new AtomicInteger(0);
+        AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+        GlobalIndexReader lazyReader =
+                new StubGlobalIndexReader(null) {
+                    @Override
+                    public Optional<GlobalIndexResult> visitEqual(
+                            FieldRef fieldRef, Object literal) {
+                        return Optional.of(
+                                GlobalIndexResult.create(
+                                        () -> {
+                                            int c = 
concurrency.incrementAndGet();
+                                            maxConcurrency.updateAndGet(cur -> 
Math.max(cur, c));
+                                            try {
+                                                Thread.sleep(50);
+                                            } catch (InterruptedException e) {
+                                                
Thread.currentThread().interrupt();
+                                            }
+                                            concurrency.decrementAndGet();
+                                            RoaringNavigableMap64 bm = new 
RoaringNavigableMap64();
+                                            bm.add(1);
+                                            bm.add(2);
+                                            bm.add(3);
+                                            return bm;
+                                        }));
+                    }
+                };
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> {
+                            if (fieldId == 0) {
+                                return Collections.singletonList(lazyReader);
+                            }
+                            return Collections.singletonList(
+                                    readerReturning(resultOf(1, 2, 3, 4, 5)));
+                        },
+                        executor);
+
+        // AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+        // lazy results for field a must not be materialized concurrently
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(builder.equal(0, 1), 
builder.equal(1, 2)),
+                        PredicateBuilder.or(builder.equal(0, 3), 
builder.equal(2, 4)));
+
+        evaluator.evaluate(predicate);
+
+        assertThat(maxConcurrency.get()).isEqualTo(1);
+        evaluator.close();
+    }
+
+    @Test
+    void testMultipleReadersPerFieldCombinedWithAnd() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult readerResult1 = resultOf(1, 2, 3, 4, 5);
+        GlobalIndexResult readerResult2 = resultOf(3, 4, 5, 6, 7);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId ->
+                                Arrays.asList(
+                                        readerReturning(readerResult1),
+                                        readerReturning(readerResult2)),
+                        executor);
+
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate = builder.equal(0, 42);
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        // Multiple readers for same field are combined with AND (intersection)
+        assertBitmapContainsExactly(result.get().results(), 3L, 4L, 5L);
+        evaluator.close();
+    }
+
+    @Test
+    void testNonFieldLeafPredicateDoesNotThrow() {
+        executor = Executors.newFixedThreadPool(2);
+        RowType rowType = rowType();
+
+        GlobalIndexResult resultA = resultOf(1, 2, 3);
+
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(
+                        rowType,
+                        fieldId -> 
Collections.singletonList(readerReturning(resultA)),
+                        executor);
+
+        // Manually build AND(alwaysTrue, a=1) to bypass PredicateBuilder 
simplification
+        PredicateBuilder builder = new PredicateBuilder(rowType);
+        Predicate predicate =
+                new CompoundPredicate(
+                        And.INSTANCE,
+                        Arrays.asList(PredicateBuilder.alwaysTrue(), 
builder.equal(0, 42)));
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+        assertThat(result).isPresent();
+        assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L);
+        evaluator.close();
+    }
+
+    @Test
+    void testNullPredicate() {
+        RowType rowType = rowType();
+        GlobalIndexEvaluator evaluator =
+                new GlobalIndexEvaluator(rowType, fieldId -> 
Collections.emptyList(), null);
+
+        Optional<GlobalIndexResult> result = evaluator.evaluate(null);
+
+        assertThat(result).isEmpty();
+        evaluator.close();
+    }
+
+    private static void assertBitmapContainsExactly(
+            RoaringNavigableMap64 bitmap, long... expected) {
+        assertThat(bitmap.getLongCardinality()).isEqualTo(expected.length);
+        for (long val : expected) {
+            assertThat(bitmap.contains(val)).isTrue();
+        }
+    }
+
+    private static class StubGlobalIndexReader implements GlobalIndexReader {
+
+        private final GlobalIndexResult result;
+
+        StubGlobalIndexReader(GlobalIndexResult result) {
+            this.result = result;
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef, 
Object literal) {
+            return Optional.ofNullable(result);
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+            return Optional.ofNullable(result);
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object 
literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef 
fieldRef, Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef, 
Object literal) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef, 
List<Object> literals) {
+            return Optional.empty();
+        }
+
+        @Override
+        public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef, 
List<Object> literals) {
+            return Optional.empty();
+        }
+
+        @Override
+        public void close() {}
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
index 3092f1b3da..3e59138019 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
@@ -32,7 +32,6 @@ import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.ManifestReadThreadPool;
 import org.apache.paimon.utils.Range;
 
 import java.io.Closeable;
@@ -54,6 +53,7 @@ import static 
org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM;
 import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
 import static 
org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
 
 /** Scanner for shard-based global indexes. */
 public class GlobalIndexScanner implements Closeable {
@@ -70,8 +70,10 @@ public class GlobalIndexScanner implements Closeable {
             IndexPathFactory indexPathFactory,
             Collection<IndexFileMeta> indexFiles) {
         this.options = options;
-        this.executor =
-                
ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM));
+        Integer threadNum = options.get(GLOBAL_INDEX_THREAD_NUM);
+        int parallelism =
+                threadNum != null ? threadNum : 
Runtime.getRuntime().availableProcessors();
+        this.executor = createCachedThreadPool(parallelism, 
"GLOBAL-INDEX-POOL");
         this.indexPathFactory = indexPathFactory;
         GlobalIndexFileReader indexFileReader = meta -> 
fileIO.newInputStream(meta.filePath());
         Map<Integer, Map<String, Map<Range, List<IndexFileMeta>>>> indexMetas 
= new HashMap<>();
@@ -94,7 +96,7 @@ public class GlobalIndexScanner implements Closeable {
                                 indexFileReader,
                                 indexMetas.get(fieldId),
                                 rowType.getField(fieldId));
-        this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, 
readersFunction);
+        this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, 
readersFunction, executor);
     }
 
     public static Optional<GlobalIndexScanner> create(
@@ -176,7 +178,7 @@ public class GlobalIndexScanner implements Closeable {
                     unionReader.add(innerReader);
                 }
 
-                readers.add(new UnionGlobalIndexReader(unionReader, executor));
+                readers.add(new UnionGlobalIndexReader(unionReader));
             }
         } catch (IOException e) {
             throw new RuntimeException("Failed to create global index reader", 
e);
@@ -195,5 +197,6 @@ public class GlobalIndexScanner implements Closeable {
     @Override
     public void close() throws IOException {
         globalIndexEvaluator.close();
+        executor.shutdownNow();
     }
 }
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py 
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index bfd3c31709..f399b84de6 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -17,6 +17,9 @@
 
 """Global index evaluator for filtering data using global indexes."""
 
+import threading
+from collections import deque
+from concurrent.futures import Executor, Future
 from typing import Callable, Collection, Dict, List, Optional
 
 from pypaimon.globalindex.global_index_reader import GlobalIndexReader, 
FieldRef
@@ -25,108 +28,204 @@ from pypaimon.common.predicate import Predicate
 from pypaimon.schema.data_types import DataField
 
 
+class _DirectExecutor(Executor):
+    """Executor that runs callables in the calling thread."""
+
+    def submit(self, fn, *args, **kwargs):
+        f = Future()
+        try:
+            result = fn(*args, **kwargs)
+            f.set_result(result)
+        except Exception as e:
+            f.set_exception(e)
+        return f
+
+    def shutdown(self, wait=True):
+        pass
+
+
 class GlobalIndexEvaluator:
-    """
-    Predicate evaluator for filtering data using global indexes.
-    """
+    """Predicate evaluator for filtering data using global indexes."""
 
     def __init__(
         self,
         fields: List[DataField],
-        readers_function: Callable[[DataField], Collection[GlobalIndexReader]]
+        readers_function: Callable[[DataField], Collection[GlobalIndexReader]],
+        executor: Optional[Executor] = None,
     ):
         self._fields = fields
         self._field_by_name = {f.name: f for f in fields}
         self._readers_function = readers_function
         self._index_readers_cache: Dict[int, Collection[GlobalIndexReader]] = 
{}
+        self._reader_locks: Dict[int, threading.Lock] = {}
+        self._locks_lock = threading.Lock()
+        self._executor = executor if executor is not None else 
_DirectExecutor()
 
     def evaluate(
         self,
         predicate: Optional[Predicate]
     ) -> Optional[GlobalIndexResult]:
-        compound_result: Optional[GlobalIndexResult] = None
-        
-        # Evaluate predicate first
-        if predicate is not None:
-            compound_result = self._visit_predicate(predicate)
-        
-        return compound_result
+        if predicate is None:
+            return None
+        future = self._visit_async(predicate)
+        return future.result()
 
-    def _visit_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
-        """Visit a predicate and return the index result."""
-        if predicate.method == 'and':
-            compound_result: Optional[GlobalIndexResult] = None
-            for child in predicate.literals:
-                child_result = self._visit_predicate(child)
-                
-                if child_result is not None:
-                    if compound_result is not None:
-                        compound_result = compound_result.and_(child_result)
-                    else:
-                        compound_result = child_result
-                
-                if compound_result is not None and compound_result.is_empty():
-                    return compound_result
-            
-            return compound_result
-        
-        elif predicate.method == 'or':
-            compound_result = GlobalIndexResult.create_empty()
-            for child in predicate.literals:
-                child_result = self._visit_predicate(child)
-                
-                if child_result is None:
-                    return None
-                
-                compound_result = compound_result.or_(child_result)
-            
-            return compound_result
-        
-        else:
-            # Leaf predicate
-            return self._visit_leaf_predicate(predicate)
+    def _visit_async(self, predicate) -> Future:
+        if isinstance(predicate, Predicate) and predicate.method in ('and', 
'or'):
+            return self._visit_compound_async(predicate)
+        return self._visit_leaf_async(predicate)
 
-    def _visit_leaf_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
-        """Visit a leaf predicate and return the index result."""
+    def _visit_leaf_async(self, predicate: Predicate) -> Future:
         field = self._field_by_name.get(predicate.field)
         if field is None:
-            return None
-        
+            f = Future()
+            f.set_result(None)
+            return f
+
         field_id = field.id
         readers = self._index_readers_cache.get(field_id)
         if readers is None:
             readers = self._readers_function(field)
             self._index_readers_cache[field_id] = readers
-        
+
         field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
-        
-        compound_result: Optional[GlobalIndexResult] = None
-        
+
+        reader_futures = []
         for reader in readers:
-            child_result = self._visit_function(reader, predicate, field_ref)
+            lock = self._get_reader_lock(id(reader))
+            reader_futures.append(
+                self._executor.submit(
+                    self._visit_reader, reader, predicate, field_ref, lock
+                )
+            )
+
+        all_done = Future()
+        if not reader_futures:
+            all_done.set_result(None)
+            return all_done
+
+        remaining = [len(reader_futures)]
+        count_lock = threading.Lock()
+
+        def on_done(_):
+            with count_lock:
+                remaining[0] -= 1
+                if remaining[0] == 0:
+                    try:
+                        all_done.set_result(
+                            self._combine_reader_results(reader_futures)
+                        )
+                    except Exception as e:
+                        all_done.set_exception(e)
+
+        for rf in reader_futures:
+            rf.add_done_callback(on_done)
+
+        return all_done
+
+    def _visit_reader(self, reader, predicate, field_ref, lock):
+        with lock:
+            result = self._visit_function(reader, predicate, field_ref)
+            if result is not None:
+                result.results()
+            return result
+
+    def _combine_reader_results(
+        self, reader_futures: List[Future]
+    ) -> Optional[GlobalIndexResult]:
+        compound_result: Optional[GlobalIndexResult] = None
+        for f in reader_futures:
+            child_result = f.result()
             if child_result is None:
                 continue
-            
             if compound_result is not None:
-                compound_result = compound_result.or_(child_result)
+                compound_result = compound_result.and_(child_result)
             else:
                 compound_result = child_result
-            
             if compound_result.is_empty():
                 return compound_result
-        
         return compound_result
 
+    def _visit_compound_async(self, predicate: Predicate) -> Future:
+        children = self._flatten_children(predicate.method, predicate.literals)
+        child_futures = [self._visit_async(child) for child in children]
+
+        all_done = Future()
+        if not child_futures:
+            all_done.set_result(None)
+            return all_done
+
+        remaining = [len(child_futures)]
+        lock = threading.Lock()
+
+        def on_done(_):
+            with lock:
+                remaining[0] -= 1
+                if remaining[0] == 0:
+                    try:
+                        results = [f.result() for f in child_futures]
+                        all_done.set_result(
+                            self._combine_results(results, predicate.method)
+                        )
+                    except Exception as e:
+                        all_done.set_exception(e)
+
+        for cf in child_futures:
+            cf.add_done_callback(on_done)
+
+        return all_done
+
+    def _combine_results(
+        self, results: List[Optional[GlobalIndexResult]], method: str
+    ) -> Optional[GlobalIndexResult]:
+        if method == 'or':
+            compound_result = GlobalIndexResult.create_empty()
+            for child_result in results:
+                if child_result is None:
+                    return None
+                compound_result = compound_result.or_(child_result)
+            return compound_result
+        else:
+            compound_result: Optional[GlobalIndexResult] = None
+            for child_result in results:
+                if child_result is not None:
+                    if compound_result is not None:
+                        compound_result = compound_result.and_(child_result)
+                    else:
+                        compound_result = child_result
+                if compound_result is not None and compound_result.is_empty():
+                    return compound_result
+            return compound_result
+
+    def _flatten_children(self, method: str, children) -> list:
+        result = []
+        stack = deque(children)
+        while stack:
+            child = stack.popleft()
+            if isinstance(child, Predicate) and child.method == method:
+                for grandchild in reversed(child.literals):
+                    stack.appendleft(grandchild)
+            else:
+                result.append(child)
+        return result
+
+    def _get_reader_lock(self, reader_id: int) -> threading.Lock:
+        with self._locks_lock:
+            lock = self._reader_locks.get(reader_id)
+            if lock is None:
+                lock = threading.Lock()
+                self._reader_locks[reader_id] = lock
+            return lock
+
     def _visit_function(
         self,
         reader: GlobalIndexReader,
         predicate: Predicate,
         field_ref: FieldRef
     ) -> Optional[GlobalIndexResult]:
-        """Visit a predicate function with the given reader."""
         method = predicate.method
         literals = predicate.literals
-        
+
         if method == 'equal':
             return reader.visit_equal(field_ref, literals[0])
         elif method == 'notEqual':
@@ -161,7 +260,6 @@ class GlobalIndexEvaluator:
         return None
 
     def close(self) -> None:
-        """Close the evaluator and release resources."""
         for readers in self._index_readers_cache.values():
             for reader in readers:
                 try:
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py 
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index bba5108466..e8e34f641a 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -17,6 +17,8 @@
 
 """Scanner for shard-based global indexes."""
 
+import os
+from concurrent.futures import ThreadPoolExecutor
 from typing import Collection, Optional
 
 from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
@@ -37,9 +39,15 @@ class GlobalIndexScanner:
         fields: list,
         file_io,
         index_path: str,
-        index_files: Collection['IndexFileMeta']
+        index_files: Collection['IndexFileMeta'],
+        thread_num: Optional[int] = None,
     ):
-        self._evaluator = self._create_evaluator(fields, file_io, index_path, 
index_files)
+        self._executor = ThreadPoolExecutor(
+            max_workers=thread_num or os.cpu_count() or 4
+        )
+        self._evaluator = self._create_evaluator(
+            fields, file_io, index_path, index_files
+        )
 
     def _create_evaluator(self, fields, file_io, index_path, index_files):
         index_metas = {}
@@ -71,7 +79,7 @@ class GlobalIndexScanner:
         def readers_function(field: DataField) -> 
Collection[GlobalIndexReader]:
             return _create_readers(file_io, index_path, 
index_metas.get(field.id), field)
 
-        return GlobalIndexEvaluator(fields, readers_function)
+        return GlobalIndexEvaluator(fields, readers_function, self._executor)
 
     @staticmethod
     def create(table, index_files=None, partition_filter=None, predicate=None) 
-> Optional['GlobalIndexScanner']:
@@ -90,7 +98,8 @@ class GlobalIndexScanner:
                 fields=table.fields,
                 file_io=table.file_io,
                 
index_path=table.path_factory().global_index_path_factory().index_path(),
-                index_files=index_files
+                index_files=index_files,
+                thread_num=table.options.global_index_thread_num(),
             )
 
         # Scan index files from snapshot using partition_filter and predicate
@@ -121,7 +130,8 @@ class GlobalIndexScanner:
             fields=table.fields,
             file_io=table.file_io,
             
index_path=table.path_factory().global_index_path_factory().index_path(),
-            index_files=scanned_index_files
+            index_files=scanned_index_files,
+            thread_num=table.options.global_index_thread_num(),
         )
 
     def scan(self, predicate: Optional[Predicate]) -> 
Optional[GlobalIndexResult]:
@@ -131,6 +141,7 @@ class GlobalIndexScanner:
     def close(self):
         """Close the scanner and release resources."""
         self._evaluator.close()
+        self._executor.shutdown(wait=False)
 
     def __enter__(self) -> 'GlobalIndexScanner':
         return self
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py 
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
new file mode 100644
index 0000000000..3017835d78
--- /dev/null
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -0,0 +1,660 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from concurrent.futures import ThreadPoolExecutor
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.schema.data_types import DataField, AtomicType
+from pypaimon.utils.range import Range
+
+
+class StubGlobalIndexReader(GlobalIndexReader):
+    """A test reader that returns a fixed result for equal predicates."""
+
+    def __init__(self, result):
+        self._result = result
+
+    def visit_equal(self, field_ref, literal):
+        return self._result
+
+    def close(self):
+        pass
+
+
+def _make_fields():
+    return [
+        DataField(0, "a", AtomicType("INT")),
+        DataField(1, "b", AtomicType("INT")),
+        DataField(2, "c", AtomicType("INT")),
+    ]
+
+
+def _result_of(*row_ids):
+    return GlobalIndexResult.from_range(Range(min(row_ids), max(row_ids)))
+
+
+class GlobalIndexEvaluatorTest(unittest.TestCase):
+
+    def test_single_field_sequential(self):
+        fields = _make_fields()
+        expected = GlobalIndexResult.from_range(Range(1, 3))
+
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(expected)],
+        )
+
+        predicate = Predicate(method='equal', index=0, field='a', 
literals=[42])
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        self.assertEqual(result.results().cardinality(), 3)
+        evaluator.close()
+
+    def test_and_parallel_multiple_fields(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(3, 7))
+
+        field_results = {0: result_a, 1: result_b}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+                Predicate(method='equal', index=1, field='b', literals=[99]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # intersection of [1,5] and [3,7] -> [3,5]
+        self.assertEqual(bm.cardinality(), 3)
+        for v in [3, 4, 5]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_or_parallel_multiple_fields(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 2))
+        result_b = GlobalIndexResult.from_range(Range(10, 11))
+
+        field_results = {0: result_a, 1: result_b}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        predicate = Predicate(
+            method='or', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+                Predicate(method='equal', index=1, field='b', literals=[99]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # union of [1,2] and [10,11] -> {1,2,10,11}
+        self.assertEqual(bm.cardinality(), 4)
+        for v in [1, 2, 10, 11]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_or_returns_none_when_child_unsupported(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 2))
+
+        def readers_fn(field):
+            if field.id == 0:
+                return [StubGlobalIndexReader(result_a)]
+            return []
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+        predicate = Predicate(
+            method='or', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+                Predicate(method='equal', index=1, field='b', literals=[99]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNone(result)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_and_with_disjoint_results(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 3))
+        result_b = GlobalIndexResult.from_range(Range(10, 12))
+
+        field_results = {0: result_a, 1: result_b}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+                Predicate(method='equal', index=1, field='b', literals=[99]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        self.assertTrue(result.is_empty())
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_no_executor_falls_back_to_sequential(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(3, 7))
+
+        field_results = {0: result_a, 1: result_b}
+        call_count = [0]
+
+        def readers_fn(field):
+            call_count[0] += 1
+            return [StubGlobalIndexReader(field_results[field.id])]
+
+        evaluator = GlobalIndexEvaluator(fields, readers_fn)
+
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+                Predicate(method='equal', index=1, field='b', literals=[99]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        self.assertEqual(call_count[0], 2)
+        evaluator.close()
+
+    def test_nested_and_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(3, 7))
+        result_c = GlobalIndexResult.from_range(Range(4, 5))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # Nested binary tree: and(and(a, b), c)
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='and', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(method='equal', index=2, field='c', literals=[3]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # intersection of [1,5], [3,7], [4,5] -> [4,5]
+        self.assertEqual(bm.cardinality(), 2)
+        for v in [4, 5]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_nested_or_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 2))
+        result_b = GlobalIndexResult.from_range(Range(3, 4))
+        result_c = GlobalIndexResult.from_range(Range(5, 6))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # Nested binary tree: or(or(a, b), c)
+        predicate = Predicate(
+            method='or', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(method='equal', index=2, field='c', literals=[3]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # union of [1,2], [3,4], [5,6] -> {1,2,3,4,5,6}
+        self.assertEqual(bm.cardinality(), 6)
+        for v in [1, 2, 3, 4, 5, 6]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_mixed_nested_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(3, 7))
+        result_c = GlobalIndexResult.from_range(Range(1, 3))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # AND(OR(a, b), OR(a, c)) — mixed nesting
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[3]),
+                        Predicate(method='equal', index=2, field='c', 
literals=[4]),
+                    ],
+                ),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # OR(a, b) = union([1,5], [3,7]) = [1,7]
+        # OR(a, c) = union([1,5], [1,3]) = [1,5]
+        # AND = intersection = [1,5]
+        self.assertEqual(bm.cardinality(), 5)
+        for v in [1, 2, 3, 4, 5]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_deep_mixed_nested_does_not_deadlock_with_small_pool(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        result_b = GlobalIndexResult.from_range(Range(2, 6))
+        result_c = GlobalIndexResult.from_range(Range(3, 7))
+
+        field_results = {0: result_a, 1: result_b, 2: result_c}
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(field_results[field.id])],
+            executor,
+        )
+
+        # AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(
+                            method='and', index=None, field=None,
+                            literals=[
+                                Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                                Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                            ],
+                        ),
+                        Predicate(method='equal', index=2, field='c', 
literals=[3]),
+                    ],
+                ),
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(
+                            method='and', index=None, field=None,
+                            literals=[
+                                Predicate(method='equal', index=0, field='a', 
literals=[4]),
+                                Predicate(method='equal', index=2, field='c', 
literals=[5]),
+                            ],
+                        ),
+                        Predicate(method='equal', index=1, field='b', 
literals=[6]),
+                    ],
+                ),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # OR(AND(a,b), c): AND(a,b)=intersect([1,5],[2,6])=[2,5], c=[3,7] => 
union=[2,7]
+        # OR(AND(a,c), b): AND(a,c)=intersect([1,5],[3,7])=[3,5], b=[2,6] => 
union=[2,6]
+        # top AND: intersect([2,7],[2,6]) = [2,6]
+        self.assertEqual(bm.cardinality(), 5)
+        for v in [2, 3, 4, 5, 6]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_same_field_predicates_not_accessed_concurrently(self):
+        import threading
+        import time
+
+        fields = _make_fields()
+
+        concurrency = [0]
+        max_concurrency = [0]
+        lock = threading.Lock()
+
+        class ConcurrencyDetectingReader(GlobalIndexReader):
+            def __init__(self, result):
+                self._result = result
+
+            def visit_equal(self, field_ref, literal):
+                with lock:
+                    concurrency[0] += 1
+                    max_concurrency[0] = max(max_concurrency[0], 
concurrency[0])
+                time.sleep(0.05)
+                with lock:
+                    concurrency[0] -= 1
+                return self._result
+
+            def close(self):
+                pass
+
+        result_a = GlobalIndexResult.from_range(Range(1, 5))
+        reader = ConcurrencyDetectingReader(result_a)
+
+        executor = ThreadPoolExecutor(max_workers=4)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [reader],
+            executor,
+        )
+
+        # AND(a=1, a=2, a=3) — all same field, must not run concurrently
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='equal', index=0, field='a', literals=[1]),
+                Predicate(method='equal', index=0, field='a', literals=[2]),
+                Predicate(method='equal', index=0, field='a', literals=[3]),
+            ],
+        )
+
+        evaluator.evaluate(predicate)
+
+        self.assertEqual(max_concurrency[0], 1)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_mixed_nested_same_field_not_accessed_concurrently(self):
+        import threading
+        import time
+
+        fields = _make_fields()
+
+        concurrency_a = [0]
+        max_concurrency_a = [0]
+        lock = threading.Lock()
+
+        class ConcurrencyDetectingReader(GlobalIndexReader):
+            def __init__(self, result):
+                self._result = result
+
+            def visit_equal(self, field_ref, literal):
+                with lock:
+                    concurrency_a[0] += 1
+                    max_concurrency_a[0] = max(max_concurrency_a[0], 
concurrency_a[0])
+                time.sleep(0.05)
+                with lock:
+                    concurrency_a[0] -= 1
+                return self._result
+
+            def close(self):
+                pass
+
+        result_all = GlobalIndexResult.from_range(Range(1, 5))
+        detecting_reader = ConcurrencyDetectingReader(result_all)
+        normal_reader = StubGlobalIndexReader(result_all)
+
+        def readers_fn(field):
+            if field.id == 0:
+                return [detecting_reader]
+            return [normal_reader]
+
+        executor = ThreadPoolExecutor(max_workers=4)
+        evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+        # AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[3]),
+                        Predicate(method='equal', index=2, field='c', 
literals=[4]),
+                    ],
+                ),
+            ],
+        )
+
+        evaluator.evaluate(predicate)
+
+        self.assertEqual(max_concurrency_a[0], 1)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_lazy_result_not_materialized_concurrently(self):
+        import threading
+        import time
+
+        fields = _make_fields()
+
+        concurrency = [0]
+        max_concurrency = [0]
+        lock = threading.Lock()
+
+        class LazyIOReader(GlobalIndexReader):
+            def visit_equal(self, field_ref, literal):
+                def supplier():
+                    with lock:
+                        concurrency[0] += 1
+                        max_concurrency[0] = max(max_concurrency[0], 
concurrency[0])
+                    time.sleep(0.05)
+                    with lock:
+                        concurrency[0] -= 1
+                    return GlobalIndexResult.from_range(Range(1, 3)).results()
+
+                return GlobalIndexResult.create(supplier)
+
+            def close(self):
+                pass
+
+        lazy_reader = LazyIOReader()
+
+        def readers_fn(field):
+            if field.id == 0:
+                return [lazy_reader]
+            return 
[StubGlobalIndexReader(GlobalIndexResult.from_range(Range(1, 5)))]
+
+        executor = ThreadPoolExecutor(max_workers=4)
+        evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+        # AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+        # lazy results for field a must not be materialized concurrently
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[1]),
+                        Predicate(method='equal', index=1, field='b', 
literals=[2]),
+                    ],
+                ),
+                Predicate(
+                    method='or', index=None, field=None,
+                    literals=[
+                        Predicate(method='equal', index=0, field='a', 
literals=[3]),
+                        Predicate(method='equal', index=2, field='c', 
literals=[4]),
+                    ],
+                ),
+            ],
+        )
+
+        evaluator.evaluate(predicate)
+
+        self.assertEqual(max_concurrency[0], 1)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_multiple_readers_per_field_combined_with_and(self):
+        fields = _make_fields()
+        reader_result1 = GlobalIndexResult.from_range(Range(1, 5))
+        reader_result2 = GlobalIndexResult.from_range(Range(3, 7))
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [
+                StubGlobalIndexReader(reader_result1),
+                StubGlobalIndexReader(reader_result2),
+            ],
+            executor,
+        )
+
+        predicate = Predicate(method='equal', index=0, field='a', 
literals=[42])
+        result = evaluator.evaluate(predicate)
+
+        self.assertIsNotNone(result)
+        bm = result.results()
+        # Multiple readers for same field are combined with AND (intersection)
+        self.assertEqual(bm.cardinality(), 3)
+        for v in [3, 4, 5]:
+            self.assertTrue(bm.contains(v))
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_non_field_leaf_predicate_does_not_throw(self):
+        fields = _make_fields()
+        result_a = GlobalIndexResult.from_range(Range(1, 3))
+
+        executor = ThreadPoolExecutor(max_workers=2)
+        evaluator = GlobalIndexEvaluator(
+            fields,
+            lambda field: [StubGlobalIndexReader(result_a)],
+            executor,
+        )
+
+        # AND(non-field leaf, a=1) — non-field leaf has field=None
+        predicate = Predicate(
+            method='and', index=None, field=None,
+            literals=[
+                Predicate(method='alwaysTrue', index=None, field=None, 
literals=[]),
+                Predicate(method='equal', index=0, field='a', literals=[42]),
+            ],
+        )
+
+        result = evaluator.evaluate(predicate)
+
+        # alwaysTrue has no reader support, returns None — AND ignores it
+        self.assertIsNotNone(result)
+        self.assertEqual(result.results().cardinality(), 3)
+        evaluator.close()
+        executor.shutdown(wait=False)
+
+    def test_null_predicate(self):
+        fields = _make_fields()
+        evaluator = GlobalIndexEvaluator(fields, lambda field: [])
+
+        result = evaluator.evaluate(None)
+
+        self.assertIsNone(result)
+        evaluator.close()
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to