This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b26aa9f9e6 [core] Parallelize GlobalIndexEvaluator across multiple
BTree fields (#7951)
b26aa9f9e6 is described below
commit b26aa9f9e6a5aa16590a4b458d824181dd75f0b7
Author: Jingsong Lee <[email protected]>
AuthorDate: Mon May 25 19:01:06 2026 +0800
[core] Parallelize GlobalIndexEvaluator across multiple BTree fields (#7951)
When a compound predicate (AND/OR) involves multiple fields, each
field's BTree index lookup is now submitted to a thread pool and
executed in parallel. This reduces scan latency for multi-field global
index queries.
Key changes:
- GlobalIndexEvaluator accepts an optional ExecutorService for parallel
child predicate evaluation in visitParallel, with sequential fallback.
- GlobalIndexScanner creates an isolated evaluator thread pool to avoid
deadlocks with the existing UnionGlobalIndexReader executor.
- Python side uses ThreadPoolExecutor with threading.Lock for
thread-safe reader cache access.
- Added GlobalIndexEvaluatorTest for both Java and Python.
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 170 +++--
.../paimon/globalindex/UnionGlobalIndexReader.java | 28 +-
.../globalindex/GlobalIndexEvaluatorTest.java | 736 +++++++++++++++++++++
.../paimon/globalindex/GlobalIndexScanner.java | 13 +-
.../pypaimon/globalindex/global_index_evaluator.py | 218 ++++--
.../pypaimon/globalindex/global_index_scanner.py | 21 +-
.../pypaimon/tests/global_index_evaluator_test.py | 660 ++++++++++++++++++
7 files changed, 1705 insertions(+), 141 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 720cf63fe4..2f07d97fca 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -23,80 +23,148 @@ import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.LeafPredicate;
import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
+import java.util.Deque;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
+import static
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
/** Predicate for filtering data using global indexes. */
-public class GlobalIndexEvaluator
- implements Closeable, PredicateVisitor<Optional<GlobalIndexResult>> {
+public class GlobalIndexEvaluator implements Closeable {
private final RowType rowType;
private final IntFunction<Collection<GlobalIndexReader>> readersFunction;
- private final Map<Integer, Collection<GlobalIndexReader>>
indexReadersCache = new HashMap<>();
+ private final Map<Integer, Collection<GlobalIndexReader>>
indexReadersCache;
+ private final ExecutorService executorService;
public GlobalIndexEvaluator(
- RowType rowType, IntFunction<Collection<GlobalIndexReader>>
readersFunction) {
+ RowType rowType,
+ IntFunction<Collection<GlobalIndexReader>> readersFunction,
+ @Nullable ExecutorService executorService) {
this.rowType = rowType;
this.readersFunction = readersFunction;
+ this.executorService =
+ executorService == null ? newDirectExecutorService() :
executorService;
+ this.indexReadersCache = new ConcurrentHashMap<>();
}
public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate)
{
- return predicate == null ? Optional.empty() : predicate.visit(this);
+ if (predicate == null) {
+ return Optional.empty();
+ }
+ try {
+ return visitAsync(predicate).get();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException("Interrupted during index evaluation",
e);
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof RuntimeException) {
+ throw (RuntimeException) e.getCause();
+ }
+ if (e.getCause() instanceof Error) {
+ throw (Error) e.getCause();
+ }
+ throw new RuntimeException(e.getCause());
+ }
}
- @Override
- public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+ private CompletableFuture<Optional<GlobalIndexResult>>
visitAsync(Predicate predicate) {
+ if (predicate instanceof LeafPredicate) {
+ return visitLeafAsync((LeafPredicate) predicate);
+ }
+ return visitCompoundAsync((CompoundPredicate) predicate);
+ }
+
+ private CompletableFuture<Optional<GlobalIndexResult>>
visitLeafAsync(LeafPredicate predicate) {
Optional<FieldRef> fieldRefOptional = predicate.fieldRefOptional();
if (!fieldRefOptional.isPresent()) {
- return Optional.empty();
+ return CompletableFuture.completedFuture(Optional.empty());
}
- Optional<GlobalIndexResult> compoundResult = Optional.empty();
FieldRef fieldRef = fieldRefOptional.get();
int fieldId = rowType.getField(fieldRef.name()).id();
Collection<GlobalIndexReader> readers =
indexReadersCache.computeIfAbsent(fieldId,
readersFunction::apply);
- for (GlobalIndexReader fileIndexReader : readers) {
- Optional<GlobalIndexResult> childResult =
- predicate.function().visit(fileIndexReader, fieldRef,
predicate.literals());
- if (!childResult.isPresent()) {
- continue;
- }
- GlobalIndexResult result = childResult.get();
+ List<CompletableFuture<Optional<GlobalIndexResult>>> readerFutures =
+ new ArrayList<>(readers.size());
+ for (GlobalIndexReader reader : readers) {
+ readerFutures.add(
+ CompletableFuture.supplyAsync(
+ () -> {
+ synchronized (reader) {
+ Optional<GlobalIndexResult> result =
+ predicate
+ .function()
+ .visit(reader, fieldRef,
predicate.literals());
+
result.ifPresent(GlobalIndexResult::results);
+ return result;
+ }
+ },
+ executorService));
+ }
- // AND Operation
- if (compoundResult.isPresent()) {
- GlobalIndexResult r1 = compoundResult.get();
- compoundResult = Optional.of(r1.and(result));
- } else {
- compoundResult = Optional.of(result);
- }
+ return CompletableFuture.allOf(readerFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(
+ v -> {
+ Optional<GlobalIndexResult> compoundResult =
Optional.empty();
+ for
(CompletableFuture<Optional<GlobalIndexResult>> f : readerFutures) {
+ Optional<GlobalIndexResult> childResult =
f.join();
+ if (!childResult.isPresent()) {
+ continue;
+ }
+ if (compoundResult.isPresent()) {
+ compoundResult =
+ Optional.of(
+
compoundResult.get().and(childResult.get()));
+ } else {
+ compoundResult = childResult;
+ }
+ if (compoundResult.get().results().isEmpty()) {
+ return compoundResult;
+ }
+ }
+ return compoundResult;
+ });
+ }
- if (compoundResult.get().results().isEmpty()) {
- return compoundResult;
- }
- }
- return compoundResult;
+ private CompletableFuture<Optional<GlobalIndexResult>> visitCompoundAsync(
+ CompoundPredicate predicate) {
+ List<Predicate> children = flattenChildren(predicate);
+ List<CompletableFuture<Optional<GlobalIndexResult>>> childFutures =
+
children.stream().map(this::visitAsync).collect(Collectors.toList());
+
+ return CompletableFuture.allOf(childFutures.toArray(new
CompletableFuture[0]))
+ .thenApply(
+ v -> {
+ List<Optional<GlobalIndexResult>> results = new
ArrayList<>();
+ for
(CompletableFuture<Optional<GlobalIndexResult>> f : childFutures) {
+ results.add(f.join());
+ }
+ return combineResults(results, predicate);
+ });
}
- @Override
- public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
+ private Optional<GlobalIndexResult> combineResults(
+ List<Optional<GlobalIndexResult>> results, CompoundPredicate
predicate) {
if (predicate.function() instanceof Or) {
GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
- for (Predicate predicate1 : predicate.children()) {
- Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
-
+ for (Optional<GlobalIndexResult> childResult : results) {
if (!childResult.isPresent()) {
return Optional.empty();
}
@@ -105,21 +173,14 @@ public class GlobalIndexEvaluator
return Optional.of(compoundResult);
} else {
Optional<GlobalIndexResult> compoundResult = Optional.empty();
- for (Predicate predicate1 : predicate.children()) {
- Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
-
- // AND Operation
+ for (Optional<GlobalIndexResult> childResult : results) {
if (childResult.isPresent()) {
if (compoundResult.isPresent()) {
- GlobalIndexResult r1 = compoundResult.get();
- GlobalIndexResult r2 = childResult.get();
- compoundResult = Optional.of(r1.and(r2));
+ compoundResult =
Optional.of(compoundResult.get().and(childResult.get()));
} else {
compoundResult = childResult;
}
}
-
- // if not remain, no need to test anymore
if (compoundResult.isPresent() &&
compoundResult.get().results().isEmpty()) {
return compoundResult;
}
@@ -128,6 +189,27 @@ public class GlobalIndexEvaluator
}
}
+ private List<Predicate> flattenChildren(CompoundPredicate predicate) {
+ List<Predicate> result = new ArrayList<>();
+ Deque<Predicate> stack = new ArrayDeque<>(predicate.children());
+ while (!stack.isEmpty()) {
+ Predicate child = stack.pollFirst();
+ if (child instanceof CompoundPredicate) {
+ CompoundPredicate compound = (CompoundPredicate) child;
+ if (compound.function().equals(predicate.function())) {
+ List<Predicate> grandChildren = compound.children();
+ for (int i = grandChildren.size() - 1; i >= 0; i--) {
+ stack.addFirst(grandChildren.get(i));
+ }
+ continue;
+ }
+ }
+ result.add(child);
+ }
+ return result;
+ }
+
+ @Override
public void close() {
IOUtils.closeAllQuietly(
indexReadersCache.values().stream()
diff --git
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
index b585a75570..23580b4171 100644
---
a/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/globalindex/UnionGlobalIndexReader.java
@@ -21,20 +21,12 @@ package org.apache.paimon.globalindex;
import org.apache.paimon.predicate.FieldRef;
import org.apache.paimon.predicate.VectorSearch;
-import javax.annotation.Nullable;
-
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Iterator;
import java.util.List;
import java.util.Optional;
-import java.util.concurrent.ExecutorService;
import java.util.function.Function;
import java.util.stream.Collectors;
-import static java.util.Collections.singletonList;
-import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialReturn;
-
/**
* A {@link GlobalIndexReader} that combines results from multiple readers by
performing a union
* (OR) operation on their results.
@@ -42,16 +34,9 @@ import static
org.apache.paimon.utils.ThreadPoolUtils.randomlyExecuteSequentialR
public class UnionGlobalIndexReader implements GlobalIndexReader {
private final List<GlobalIndexReader> readers;
- private final @Nullable ExecutorService executor;
public UnionGlobalIndexReader(List<GlobalIndexReader> readers) {
- this(readers, null);
- }
-
- public UnionGlobalIndexReader(
- List<GlobalIndexReader> readers, @Nullable ExecutorService
executor) {
this.readers = readers;
- this.executor = executor;
}
@Override
@@ -163,18 +148,7 @@ public class UnionGlobalIndexReader implements
GlobalIndexReader {
}
private <R> List<R> executeAllReaders(Function<GlobalIndexReader, R>
function) {
- if (executor == null) {
- return readers.stream().map(function).collect(Collectors.toList());
- }
-
- Iterator<R> iterator =
- randomlyExecuteSequentialReturn(
- executor, reader ->
singletonList(function.apply(reader)), readers);
- List<R> result = new ArrayList<>();
- while (iterator.hasNext()) {
- result.add(iterator.next());
- }
- return result;
+ return readers.stream().map(function).collect(Collectors.toList());
}
@Override
diff --git
a/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
new file mode 100644
index 0000000000..0eea0bb02b
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/globalindex/GlobalIndexEvaluatorTest.java
@@ -0,0 +1,736 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex;
+
+import org.apache.paimon.predicate.And;
+import org.apache.paimon.predicate.CompoundPredicate;
+import org.apache.paimon.predicate.FieldRef;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link GlobalIndexEvaluator}. */
+class GlobalIndexEvaluatorTest {
+
+ private ExecutorService executor;
+
+ @AfterEach
+ void tearDown() {
+ if (executor != null) {
+ executor.shutdownNow();
+ }
+ }
+
+ private static RowType rowType() {
+ return new RowType(
+ Arrays.asList(
+ new DataField(0, "a", DataTypes.INT()),
+ new DataField(1, "b", DataTypes.INT()),
+ new DataField(2, "c", DataTypes.INT())));
+ }
+
+ private static GlobalIndexResult resultOf(long... rowIds) {
+ return GlobalIndexResult.create(
+ () -> {
+ RoaringNavigableMap64 bm = new RoaringNavigableMap64();
+ for (long id : rowIds) {
+ bm.add(id);
+ }
+ return bm;
+ });
+ }
+
+ private static GlobalIndexReader readerReturning(GlobalIndexResult result)
{
+ return new StubGlobalIndexReader(result);
+ }
+
+ @Test
+ void testSingleFieldSequential() {
+ RowType rowType = rowType();
+ GlobalIndexResult expected = resultOf(1, 2, 3);
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
Collections.singletonList(readerReturning(expected)),
+ null);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = builder.equal(0, 42);
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L);
+ evaluator.close();
+ }
+
+ @Test
+ void testAndParallelMultipleFields() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = PredicateBuilder.and(builder.equal(0, 42),
builder.equal(1, 99));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertBitmapContainsExactly(result.get().results(), 3L, 4L, 5L);
+ evaluator.close();
+ }
+
+ @Test
+ void testOrParallelMultipleFields() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2);
+ GlobalIndexResult resultB = resultOf(3, 4);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = PredicateBuilder.or(builder.equal(0, 42),
builder.equal(1, 99));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L);
+ evaluator.close();
+ }
+
+ @Test
+ void testOrReturnsEmptyWhenChildUnsupported() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId -> {
+ if (fieldId == 0) {
+ return
Collections.singletonList(readerReturning(resultA));
+ }
+ return Collections.emptyList();
+ },
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = PredicateBuilder.or(builder.equal(0, 42),
builder.equal(1, 99));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isEmpty();
+ evaluator.close();
+ }
+
+ @Test
+ void testAndWithEmptyResultShortCircuits() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3);
+ GlobalIndexResult resultB = resultOf(10, 11);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = PredicateBuilder.and(builder.equal(0, 42),
builder.equal(1, 99));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertThat(result.get().results().isEmpty()).isTrue();
+ evaluator.close();
+ }
+
+ @Test
+ void testParallelUsesMultipleThreads() {
+ executor = Executors.newFixedThreadPool(3);
+ RowType rowType = rowType();
+
+ ConcurrentHashMap<String, Boolean> threadNames = new
ConcurrentHashMap<>();
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+ new
StubGlobalIndexReader(resultOf(fieldId, fieldId + 10)) {
+ @Override
+ public Optional<GlobalIndexResult>
visitEqual(
+ FieldRef fieldRef, Object
literal) {
+ threadNames.put(
+
Thread.currentThread().getName(), true);
+ return
super.visitEqual(fieldRef, literal);
+ }
+ }),
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2),
builder.equal(2, 3));
+
+ evaluator.evaluate(predicate);
+
+ assertThat(threadNames.size()).isGreaterThan(1);
+ evaluator.close();
+ }
+
+ @Test
+ void testNullExecutorFallsBackToSequential() {
+ RowType rowType = rowType();
+
+ AtomicInteger callCount = new AtomicInteger();
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId -> {
+ callCount.incrementAndGet();
+ return Collections.singletonList(
+ readerReturning(resultOf(fieldId, fieldId
+ 10)));
+ },
+ null);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = PredicateBuilder.and(builder.equal(0, 1),
builder.equal(1, 2));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertThat(callCount.get()).isEqualTo(2);
+ evaluator.close();
+ }
+
+ @Test
+ void testNestedAndPredicateDoesNotDeadlockWithSmallPool() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+ GlobalIndexResult resultC = resultOf(4, 5, 8, 9);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+ fieldResults.put(2, resultC);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ // and(a, b, c) builds as and(and(a, b), c) — nested binary tree
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(builder.equal(0, 1), builder.equal(1, 2),
builder.equal(2, 3));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // intersection of {1..5}, {3..7}, {4,5,8,9} -> {4,5}
+ assertBitmapContainsExactly(result.get().results(), 4L, 5L);
+ evaluator.close();
+ }
+
+ @Test
+ void testNestedOrPredicateDoesNotDeadlockWithSmallPool() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2);
+ GlobalIndexResult resultB = resultOf(3, 4);
+ GlobalIndexResult resultC = resultOf(5, 6);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+ fieldResults.put(2, resultC);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ // or(a, b, c) builds as or(or(a, b), c) — nested binary tree
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.or(builder.equal(0, 1), builder.equal(1, 2),
builder.equal(2, 3));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // union of {1,2}, {3,4}, {5,6}
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L,
5L, 6L);
+ evaluator.close();
+ }
+
+ @Test
+ void testMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult resultB = resultOf(3, 4, 5, 6, 7);
+ GlobalIndexResult resultC = resultOf(1, 2, 3, 10, 11);
+ // Field c used for second OR child - distinct from field a/b
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+ fieldResults.put(2, resultC);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ // AND(OR(a, b), OR(a, c)) — mixed nesting, different compound types
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.or(builder.equal(0, 1),
builder.equal(1, 2)),
+ PredicateBuilder.or(builder.equal(0, 3),
builder.equal(2, 4)));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // OR(a, b) = union({1..5}, {3..7}) = {1..7}
+ // OR(a, c) = union({1..5}, {1,2,3,10,11}) = {1,2,3,4,5,10,11}
+ // AND = intersection = {1,2,3,4,5}
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L, 4L,
5L);
+ evaluator.close();
+ }
+
+ @Test
+ void testDeepMixedNestedPredicateDoesNotDeadlockWithSmallPool() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult resultB = resultOf(2, 3, 4, 5, 6);
+ GlobalIndexResult resultC = resultOf(3, 4, 5, 6, 7);
+
+ ConcurrentHashMap<Integer, GlobalIndexResult> fieldResults = new
ConcurrentHashMap<>();
+ fieldResults.put(0, resultA);
+ fieldResults.put(1, resultB);
+ fieldResults.put(2, resultC);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Collections.singletonList(
+
readerReturning(fieldResults.get(fieldId))),
+ executor);
+
+ // AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.or(
+ PredicateBuilder.and(builder.equal(0, 1),
builder.equal(1, 2)),
+ builder.equal(2, 3)),
+ PredicateBuilder.or(
+ PredicateBuilder.and(builder.equal(0, 4),
builder.equal(2, 5)),
+ builder.equal(1, 6)));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // OR(AND(a,b), c): AND(a,b)={2,3,4,5}, c={3..7} => union={2,3,4,5,6,7}
+ // OR(AND(a,c), b): AND(a,c)={3,4,5}, b={2..6} => union={2,3,4,5,6}
+ // top AND: intersection = {2,3,4,5,6}
+ assertBitmapContainsExactly(result.get().results(), 2L, 3L, 4L, 5L,
6L);
+ evaluator.close();
+ }
+
+ @Test
+ void testSameFieldPredicatesNotAccessedConcurrently() {
+ executor = Executors.newFixedThreadPool(4);
+ RowType rowType = rowType();
+
+ AtomicInteger concurrency = new AtomicInteger(0);
+ AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+ GlobalIndexReader concurrencyDetectingReader =
+ new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(
+ FieldRef fieldRef, Object literal) {
+ int c = concurrency.incrementAndGet();
+ maxConcurrency.updateAndGet(cur -> Math.max(cur, c));
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ concurrency.decrementAndGet();
+ return super.visitEqual(fieldRef, literal);
+ }
+ };
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
Collections.singletonList(concurrencyDetectingReader),
+ executor);
+
+ // AND(a=1, a=2, a=3) — all same field, must not run concurrently
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(builder.equal(0, 1), builder.equal(0, 2),
builder.equal(0, 3));
+
+ evaluator.evaluate(predicate);
+
+ assertThat(maxConcurrency.get()).isEqualTo(1);
+ evaluator.close();
+ }
+
+ @Test
+ void testMixedNestedSameFieldNotAccessedConcurrently() {
+ executor = Executors.newFixedThreadPool(4);
+ RowType rowType = rowType();
+
+ AtomicInteger concurrencyA = new AtomicInteger(0);
+ AtomicInteger maxConcurrencyA = new AtomicInteger(0);
+
+ GlobalIndexReader concurrencyDetectingReaderA =
+ new StubGlobalIndexReader(resultOf(1, 2, 3, 4, 5)) {
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(
+ FieldRef fieldRef, Object literal) {
+ int c = concurrencyA.incrementAndGet();
+ maxConcurrencyA.updateAndGet(cur -> Math.max(cur, c));
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ concurrencyA.decrementAndGet();
+ return super.visitEqual(fieldRef, literal);
+ }
+ };
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId -> {
+ if (fieldId == 0) {
+ return
Collections.singletonList(concurrencyDetectingReaderA);
+ }
+ return Collections.singletonList(
+ readerReturning(resultOf(1, 2, 3, 4, 5)));
+ },
+ executor);
+
+ // AND(OR(a=1, b=2), OR(a=3, c=4)) — field a appears in both OR
subtrees
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.or(builder.equal(0, 1),
builder.equal(1, 2)),
+ PredicateBuilder.or(builder.equal(0, 3),
builder.equal(2, 4)));
+
+ evaluator.evaluate(predicate);
+
+ assertThat(maxConcurrencyA.get()).isEqualTo(1);
+ evaluator.close();
+ }
+
+ @Test
+ void testLazyResultNotMaterializedConcurrently() {
+ executor = Executors.newFixedThreadPool(4);
+ RowType rowType = rowType();
+
+ AtomicInteger concurrency = new AtomicInteger(0);
+ AtomicInteger maxConcurrency = new AtomicInteger(0);
+
+ GlobalIndexReader lazyReader =
+ new StubGlobalIndexReader(null) {
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(
+ FieldRef fieldRef, Object literal) {
+ return Optional.of(
+ GlobalIndexResult.create(
+ () -> {
+ int c =
concurrency.incrementAndGet();
+ maxConcurrency.updateAndGet(cur ->
Math.max(cur, c));
+ try {
+ Thread.sleep(50);
+ } catch (InterruptedException e) {
+
Thread.currentThread().interrupt();
+ }
+ concurrency.decrementAndGet();
+ RoaringNavigableMap64 bm = new
RoaringNavigableMap64();
+ bm.add(1);
+ bm.add(2);
+ bm.add(3);
+ return bm;
+ }));
+ }
+ };
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId -> {
+ if (fieldId == 0) {
+ return Collections.singletonList(lazyReader);
+ }
+ return Collections.singletonList(
+ readerReturning(resultOf(1, 2, 3, 4, 5)));
+ },
+ executor);
+
+ // AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+ // lazy results for field a must not be materialized concurrently
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ PredicateBuilder.and(
+ PredicateBuilder.or(builder.equal(0, 1),
builder.equal(1, 2)),
+ PredicateBuilder.or(builder.equal(0, 3),
builder.equal(2, 4)));
+
+ evaluator.evaluate(predicate);
+
+ assertThat(maxConcurrency.get()).isEqualTo(1);
+ evaluator.close();
+ }
+
+ @Test
+ void testMultipleReadersPerFieldCombinedWithAnd() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult readerResult1 = resultOf(1, 2, 3, 4, 5);
+ GlobalIndexResult readerResult2 = resultOf(3, 4, 5, 6, 7);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
+ Arrays.asList(
+ readerReturning(readerResult1),
+ readerReturning(readerResult2)),
+ executor);
+
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate = builder.equal(0, 42);
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ // Multiple readers for same field are combined with AND (intersection)
+ assertBitmapContainsExactly(result.get().results(), 3L, 4L, 5L);
+ evaluator.close();
+ }
+
+ @Test
+ void testNonFieldLeafPredicateDoesNotThrow() {
+ executor = Executors.newFixedThreadPool(2);
+ RowType rowType = rowType();
+
+ GlobalIndexResult resultA = resultOf(1, 2, 3);
+
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(
+ rowType,
+ fieldId ->
Collections.singletonList(readerReturning(resultA)),
+ executor);
+
+ // Manually build AND(alwaysTrue, a=1) to bypass PredicateBuilder
simplification
+ PredicateBuilder builder = new PredicateBuilder(rowType);
+ Predicate predicate =
+ new CompoundPredicate(
+ And.INSTANCE,
+ Arrays.asList(PredicateBuilder.alwaysTrue(),
builder.equal(0, 42)));
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(predicate);
+
+ assertThat(result).isPresent();
+ assertBitmapContainsExactly(result.get().results(), 1L, 2L, 3L);
+ evaluator.close();
+ }
+
+ @Test
+ void testNullPredicate() {
+ RowType rowType = rowType();
+ GlobalIndexEvaluator evaluator =
+ new GlobalIndexEvaluator(rowType, fieldId ->
Collections.emptyList(), null);
+
+ Optional<GlobalIndexResult> result = evaluator.evaluate(null);
+
+ assertThat(result).isEmpty();
+ evaluator.close();
+ }
+
+ private static void assertBitmapContainsExactly(
+ RoaringNavigableMap64 bitmap, long... expected) {
+ assertThat(bitmap.getLongCardinality()).isEqualTo(expected.length);
+ for (long val : expected) {
+ assertThat(bitmap.contains(val)).isTrue();
+ }
+ }
+
+ private static class StubGlobalIndexReader implements GlobalIndexReader {
+
+ private final GlobalIndexResult result;
+
+ StubGlobalIndexReader(GlobalIndexResult result) {
+ this.result = result;
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.ofNullable(result);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNotNull(FieldRef fieldRef) {
+ return Optional.ofNullable(result);
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIsNull(FieldRef fieldRef) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitStartsWith(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitEndsWith(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitContains(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLike(FieldRef fieldRef, Object
literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessThan(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterOrEqual(FieldRef
fieldRef, Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitLessOrEqual(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitGreaterThan(FieldRef fieldRef,
Object literal) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.empty();
+ }
+
+ @Override
+ public Optional<GlobalIndexResult> visitNotIn(FieldRef fieldRef,
List<Object> literals) {
+ return Optional.empty();
+ }
+
+ @Override
+ public void close() {}
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
index 3092f1b3da..3e59138019 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexScanner.java
@@ -32,7 +32,6 @@ import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.ManifestReadThreadPool;
import org.apache.paimon.utils.Range;
import java.io.Closeable;
@@ -54,6 +53,7 @@ import static
org.apache.paimon.CoreOptions.GLOBAL_INDEX_THREAD_NUM;
import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
import static
org.apache.paimon.table.source.snapshot.TimeTravelUtil.tryTravelOrLatest;
import static org.apache.paimon.utils.Preconditions.checkNotNull;
+import static org.apache.paimon.utils.ThreadPoolUtils.createCachedThreadPool;
/** Scanner for shard-based global indexes. */
public class GlobalIndexScanner implements Closeable {
@@ -70,8 +70,10 @@ public class GlobalIndexScanner implements Closeable {
IndexPathFactory indexPathFactory,
Collection<IndexFileMeta> indexFiles) {
this.options = options;
- this.executor =
-
ManifestReadThreadPool.getExecutorService(options.get(GLOBAL_INDEX_THREAD_NUM));
+ Integer threadNum = options.get(GLOBAL_INDEX_THREAD_NUM);
+ int parallelism =
+ threadNum != null ? threadNum :
Runtime.getRuntime().availableProcessors();
+ this.executor = createCachedThreadPool(parallelism,
"GLOBAL-INDEX-POOL");
this.indexPathFactory = indexPathFactory;
GlobalIndexFileReader indexFileReader = meta ->
fileIO.newInputStream(meta.filePath());
Map<Integer, Map<String, Map<Range, List<IndexFileMeta>>>> indexMetas
= new HashMap<>();
@@ -94,7 +96,7 @@ public class GlobalIndexScanner implements Closeable {
indexFileReader,
indexMetas.get(fieldId),
rowType.getField(fieldId));
- this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction);
+ this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction, executor);
}
public static Optional<GlobalIndexScanner> create(
@@ -176,7 +178,7 @@ public class GlobalIndexScanner implements Closeable {
unionReader.add(innerReader);
}
- readers.add(new UnionGlobalIndexReader(unionReader, executor));
+ readers.add(new UnionGlobalIndexReader(unionReader));
}
} catch (IOException e) {
throw new RuntimeException("Failed to create global index reader",
e);
@@ -195,5 +197,6 @@ public class GlobalIndexScanner implements Closeable {
@Override
public void close() throws IOException {
globalIndexEvaluator.close();
+ executor.shutdownNow();
}
}
diff --git a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
index bfd3c31709..f399b84de6 100644
--- a/paimon-python/pypaimon/globalindex/global_index_evaluator.py
+++ b/paimon-python/pypaimon/globalindex/global_index_evaluator.py
@@ -17,6 +17,9 @@
"""Global index evaluator for filtering data using global indexes."""
+import threading
+from collections import deque
+from concurrent.futures import Executor, Future
from typing import Callable, Collection, Dict, List, Optional
from pypaimon.globalindex.global_index_reader import GlobalIndexReader,
FieldRef
@@ -25,108 +28,204 @@ from pypaimon.common.predicate import Predicate
from pypaimon.schema.data_types import DataField
+class _DirectExecutor(Executor):
+ """Executor that runs callables in the calling thread."""
+
+ def submit(self, fn, *args, **kwargs):
+ f = Future()
+ try:
+ result = fn(*args, **kwargs)
+ f.set_result(result)
+ except Exception as e:
+ f.set_exception(e)
+ return f
+
+ def shutdown(self, wait=True):
+ pass
+
+
class GlobalIndexEvaluator:
- """
- Predicate evaluator for filtering data using global indexes.
- """
+ """Predicate evaluator for filtering data using global indexes."""
def __init__(
self,
fields: List[DataField],
- readers_function: Callable[[DataField], Collection[GlobalIndexReader]]
+ readers_function: Callable[[DataField], Collection[GlobalIndexReader]],
+ executor: Optional[Executor] = None,
):
self._fields = fields
self._field_by_name = {f.name: f for f in fields}
self._readers_function = readers_function
self._index_readers_cache: Dict[int, Collection[GlobalIndexReader]] =
{}
+ self._reader_locks: Dict[int, threading.Lock] = {}
+ self._locks_lock = threading.Lock()
+ self._executor = executor if executor is not None else
_DirectExecutor()
def evaluate(
self,
predicate: Optional[Predicate]
) -> Optional[GlobalIndexResult]:
- compound_result: Optional[GlobalIndexResult] = None
-
- # Evaluate predicate first
- if predicate is not None:
- compound_result = self._visit_predicate(predicate)
-
- return compound_result
+ if predicate is None:
+ return None
+ future = self._visit_async(predicate)
+ return future.result()
- def _visit_predicate(self, predicate: Predicate) ->
Optional[GlobalIndexResult]:
- """Visit a predicate and return the index result."""
- if predicate.method == 'and':
- compound_result: Optional[GlobalIndexResult] = None
- for child in predicate.literals:
- child_result = self._visit_predicate(child)
-
- if child_result is not None:
- if compound_result is not None:
- compound_result = compound_result.and_(child_result)
- else:
- compound_result = child_result
-
- if compound_result is not None and compound_result.is_empty():
- return compound_result
-
- return compound_result
-
- elif predicate.method == 'or':
- compound_result = GlobalIndexResult.create_empty()
- for child in predicate.literals:
- child_result = self._visit_predicate(child)
-
- if child_result is None:
- return None
-
- compound_result = compound_result.or_(child_result)
-
- return compound_result
-
- else:
- # Leaf predicate
- return self._visit_leaf_predicate(predicate)
+ def _visit_async(self, predicate) -> Future:
+ if isinstance(predicate, Predicate) and predicate.method in ('and',
'or'):
+ return self._visit_compound_async(predicate)
+ return self._visit_leaf_async(predicate)
- def _visit_leaf_predicate(self, predicate: Predicate) ->
Optional[GlobalIndexResult]:
- """Visit a leaf predicate and return the index result."""
+ def _visit_leaf_async(self, predicate: Predicate) -> Future:
field = self._field_by_name.get(predicate.field)
if field is None:
- return None
-
+ f = Future()
+ f.set_result(None)
+ return f
+
field_id = field.id
readers = self._index_readers_cache.get(field_id)
if readers is None:
readers = self._readers_function(field)
self._index_readers_cache[field_id] = readers
-
+
field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
-
- compound_result: Optional[GlobalIndexResult] = None
-
+
+ reader_futures = []
for reader in readers:
- child_result = self._visit_function(reader, predicate, field_ref)
+ lock = self._get_reader_lock(id(reader))
+ reader_futures.append(
+ self._executor.submit(
+ self._visit_reader, reader, predicate, field_ref, lock
+ )
+ )
+
+ all_done = Future()
+ if not reader_futures:
+ all_done.set_result(None)
+ return all_done
+
+ remaining = [len(reader_futures)]
+ count_lock = threading.Lock()
+
+ def on_done(_):
+ with count_lock:
+ remaining[0] -= 1
+ if remaining[0] == 0:
+ try:
+ all_done.set_result(
+ self._combine_reader_results(reader_futures)
+ )
+ except Exception as e:
+ all_done.set_exception(e)
+
+ for rf in reader_futures:
+ rf.add_done_callback(on_done)
+
+ return all_done
+
+ def _visit_reader(self, reader, predicate, field_ref, lock):
+ with lock:
+ result = self._visit_function(reader, predicate, field_ref)
+ if result is not None:
+ result.results()
+ return result
+
+ def _combine_reader_results(
+ self, reader_futures: List[Future]
+ ) -> Optional[GlobalIndexResult]:
+ compound_result: Optional[GlobalIndexResult] = None
+ for f in reader_futures:
+ child_result = f.result()
if child_result is None:
continue
-
if compound_result is not None:
- compound_result = compound_result.or_(child_result)
+ compound_result = compound_result.and_(child_result)
else:
compound_result = child_result
-
if compound_result.is_empty():
return compound_result
-
return compound_result
+ def _visit_compound_async(self, predicate: Predicate) -> Future:
+ children = self._flatten_children(predicate.method, predicate.literals)
+ child_futures = [self._visit_async(child) for child in children]
+
+ all_done = Future()
+ if not child_futures:
+ all_done.set_result(None)
+ return all_done
+
+ remaining = [len(child_futures)]
+ lock = threading.Lock()
+
+ def on_done(_):
+ with lock:
+ remaining[0] -= 1
+ if remaining[0] == 0:
+ try:
+ results = [f.result() for f in child_futures]
+ all_done.set_result(
+ self._combine_results(results, predicate.method)
+ )
+ except Exception as e:
+ all_done.set_exception(e)
+
+ for cf in child_futures:
+ cf.add_done_callback(on_done)
+
+ return all_done
+
+ def _combine_results(
+ self, results: List[Optional[GlobalIndexResult]], method: str
+ ) -> Optional[GlobalIndexResult]:
+ if method == 'or':
+ compound_result = GlobalIndexResult.create_empty()
+ for child_result in results:
+ if child_result is None:
+ return None
+ compound_result = compound_result.or_(child_result)
+ return compound_result
+ else:
+ compound_result: Optional[GlobalIndexResult] = None
+ for child_result in results:
+ if child_result is not None:
+ if compound_result is not None:
+ compound_result = compound_result.and_(child_result)
+ else:
+ compound_result = child_result
+ if compound_result is not None and compound_result.is_empty():
+ return compound_result
+ return compound_result
+
+ def _flatten_children(self, method: str, children) -> list:
+ result = []
+ stack = deque(children)
+ while stack:
+ child = stack.popleft()
+ if isinstance(child, Predicate) and child.method == method:
+ for grandchild in reversed(child.literals):
+ stack.appendleft(grandchild)
+ else:
+ result.append(child)
+ return result
+
+ def _get_reader_lock(self, reader_id: int) -> threading.Lock:
+ with self._locks_lock:
+ lock = self._reader_locks.get(reader_id)
+ if lock is None:
+ lock = threading.Lock()
+ self._reader_locks[reader_id] = lock
+ return lock
+
def _visit_function(
self,
reader: GlobalIndexReader,
predicate: Predicate,
field_ref: FieldRef
) -> Optional[GlobalIndexResult]:
- """Visit a predicate function with the given reader."""
method = predicate.method
literals = predicate.literals
-
+
if method == 'equal':
return reader.visit_equal(field_ref, literals[0])
elif method == 'notEqual':
@@ -161,7 +260,6 @@ class GlobalIndexEvaluator:
return None
def close(self) -> None:
- """Close the evaluator and release resources."""
for readers in self._index_readers_cache.values():
for reader in readers:
try:
diff --git a/paimon-python/pypaimon/globalindex/global_index_scanner.py
b/paimon-python/pypaimon/globalindex/global_index_scanner.py
index bba5108466..e8e34f641a 100644
--- a/paimon-python/pypaimon/globalindex/global_index_scanner.py
+++ b/paimon-python/pypaimon/globalindex/global_index_scanner.py
@@ -17,6 +17,8 @@
"""Scanner for shard-based global indexes."""
+import os
+from concurrent.futures import ThreadPoolExecutor
from typing import Collection, Optional
from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
@@ -37,9 +39,15 @@ class GlobalIndexScanner:
fields: list,
file_io,
index_path: str,
- index_files: Collection['IndexFileMeta']
+ index_files: Collection['IndexFileMeta'],
+ thread_num: Optional[int] = None,
):
- self._evaluator = self._create_evaluator(fields, file_io, index_path,
index_files)
+ self._executor = ThreadPoolExecutor(
+ max_workers=thread_num or os.cpu_count() or 4
+ )
+ self._evaluator = self._create_evaluator(
+ fields, file_io, index_path, index_files
+ )
def _create_evaluator(self, fields, file_io, index_path, index_files):
index_metas = {}
@@ -71,7 +79,7 @@ class GlobalIndexScanner:
def readers_function(field: DataField) ->
Collection[GlobalIndexReader]:
return _create_readers(file_io, index_path,
index_metas.get(field.id), field)
- return GlobalIndexEvaluator(fields, readers_function)
+ return GlobalIndexEvaluator(fields, readers_function, self._executor)
@staticmethod
def create(table, index_files=None, partition_filter=None, predicate=None)
-> Optional['GlobalIndexScanner']:
@@ -90,7 +98,8 @@ class GlobalIndexScanner:
fields=table.fields,
file_io=table.file_io,
index_path=table.path_factory().global_index_path_factory().index_path(),
- index_files=index_files
+ index_files=index_files,
+ thread_num=table.options.global_index_thread_num(),
)
# Scan index files from snapshot using partition_filter and predicate
@@ -121,7 +130,8 @@ class GlobalIndexScanner:
fields=table.fields,
file_io=table.file_io,
index_path=table.path_factory().global_index_path_factory().index_path(),
- index_files=scanned_index_files
+ index_files=scanned_index_files,
+ thread_num=table.options.global_index_thread_num(),
)
def scan(self, predicate: Optional[Predicate]) ->
Optional[GlobalIndexResult]:
@@ -131,6 +141,7 @@ class GlobalIndexScanner:
def close(self):
"""Close the scanner and release resources."""
self._evaluator.close()
+ self._executor.shutdown(wait=False)
def __enter__(self) -> 'GlobalIndexScanner':
return self
diff --git a/paimon-python/pypaimon/tests/global_index_evaluator_test.py
b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
new file mode 100644
index 0000000000..3017835d78
--- /dev/null
+++ b/paimon-python/pypaimon/tests/global_index_evaluator_test.py
@@ -0,0 +1,660 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from concurrent.futures import ThreadPoolExecutor
+
+from pypaimon.common.predicate import Predicate
+from pypaimon.globalindex.global_index_evaluator import GlobalIndexEvaluator
+from pypaimon.globalindex.global_index_reader import GlobalIndexReader
+from pypaimon.globalindex.global_index_result import GlobalIndexResult
+from pypaimon.schema.data_types import DataField, AtomicType
+from pypaimon.utils.range import Range
+
+
+class StubGlobalIndexReader(GlobalIndexReader):
+ """A test reader that returns a fixed result for equal predicates."""
+
+ def __init__(self, result):
+ self._result = result
+
+ def visit_equal(self, field_ref, literal):
+ return self._result
+
+ def close(self):
+ pass
+
+
+def _make_fields():
+ return [
+ DataField(0, "a", AtomicType("INT")),
+ DataField(1, "b", AtomicType("INT")),
+ DataField(2, "c", AtomicType("INT")),
+ ]
+
+
+def _result_of(*row_ids):
+ return GlobalIndexResult.from_range(Range(min(row_ids), max(row_ids)))
+
+
+class GlobalIndexEvaluatorTest(unittest.TestCase):
+
+ def test_single_field_sequential(self):
+ fields = _make_fields()
+ expected = GlobalIndexResult.from_range(Range(1, 3))
+
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(expected)],
+ )
+
+ predicate = Predicate(method='equal', index=0, field='a',
literals=[42])
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ self.assertEqual(result.results().cardinality(), 3)
+ evaluator.close()
+
+ def test_and_parallel_multiple_fields(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(3, 7))
+
+ field_results = {0: result_a, 1: result_b}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ Predicate(method='equal', index=1, field='b', literals=[99]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # intersection of [1,5] and [3,7] -> [3,5]
+ self.assertEqual(bm.cardinality(), 3)
+ for v in [3, 4, 5]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_or_parallel_multiple_fields(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 2))
+ result_b = GlobalIndexResult.from_range(Range(10, 11))
+
+ field_results = {0: result_a, 1: result_b}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ predicate = Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ Predicate(method='equal', index=1, field='b', literals=[99]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # union of [1,2] and [10,11] -> {1,2,10,11}
+ self.assertEqual(bm.cardinality(), 4)
+ for v in [1, 2, 10, 11]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_or_returns_none_when_child_unsupported(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 2))
+
+ def readers_fn(field):
+ if field.id == 0:
+ return [StubGlobalIndexReader(result_a)]
+ return []
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+ predicate = Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ Predicate(method='equal', index=1, field='b', literals=[99]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNone(result)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_and_with_disjoint_results(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 3))
+ result_b = GlobalIndexResult.from_range(Range(10, 12))
+
+ field_results = {0: result_a, 1: result_b}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ Predicate(method='equal', index=1, field='b', literals=[99]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ self.assertTrue(result.is_empty())
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_no_executor_falls_back_to_sequential(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(3, 7))
+
+ field_results = {0: result_a, 1: result_b}
+ call_count = [0]
+
+ def readers_fn(field):
+ call_count[0] += 1
+ return [StubGlobalIndexReader(field_results[field.id])]
+
+ evaluator = GlobalIndexEvaluator(fields, readers_fn)
+
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ Predicate(method='equal', index=1, field='b', literals=[99]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ self.assertEqual(call_count[0], 2)
+ evaluator.close()
+
+ def test_nested_and_does_not_deadlock_with_small_pool(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(3, 7))
+ result_c = GlobalIndexResult.from_range(Range(4, 5))
+
+ field_results = {0: result_a, 1: result_b, 2: result_c}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ # Nested binary tree: and(and(a, b), c)
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(method='equal', index=2, field='c', literals=[3]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # intersection of [1,5], [3,7], [4,5] -> [4,5]
+ self.assertEqual(bm.cardinality(), 2)
+ for v in [4, 5]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_nested_or_does_not_deadlock_with_small_pool(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 2))
+ result_b = GlobalIndexResult.from_range(Range(3, 4))
+ result_c = GlobalIndexResult.from_range(Range(5, 6))
+
+ field_results = {0: result_a, 1: result_b, 2: result_c}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ # Nested binary tree: or(or(a, b), c)
+ predicate = Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(method='equal', index=2, field='c', literals=[3]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # union of [1,2], [3,4], [5,6] -> {1,2,3,4,5,6}
+ self.assertEqual(bm.cardinality(), 6)
+ for v in [1, 2, 3, 4, 5, 6]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_mixed_nested_does_not_deadlock_with_small_pool(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(3, 7))
+ result_c = GlobalIndexResult.from_range(Range(1, 3))
+
+ field_results = {0: result_a, 1: result_b, 2: result_c}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ # AND(OR(a, b), OR(a, c)) — mixed nesting
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[3]),
+ Predicate(method='equal', index=2, field='c',
literals=[4]),
+ ],
+ ),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # OR(a, b) = union([1,5], [3,7]) = [1,7]
+ # OR(a, c) = union([1,5], [1,3]) = [1,5]
+ # AND = intersection = [1,5]
+ self.assertEqual(bm.cardinality(), 5)
+ for v in [1, 2, 3, 4, 5]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_deep_mixed_nested_does_not_deadlock_with_small_pool(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ result_b = GlobalIndexResult.from_range(Range(2, 6))
+ result_c = GlobalIndexResult.from_range(Range(3, 7))
+
+ field_results = {0: result_a, 1: result_b, 2: result_c}
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(field_results[field.id])],
+ executor,
+ )
+
+ # AND(OR(AND(a, b), c), OR(AND(a, c), b)) — deep mixed nesting
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(method='equal', index=2, field='c',
literals=[3]),
+ ],
+ ),
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[4]),
+ Predicate(method='equal', index=2, field='c',
literals=[5]),
+ ],
+ ),
+ Predicate(method='equal', index=1, field='b',
literals=[6]),
+ ],
+ ),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # OR(AND(a,b), c): AND(a,b)=intersect([1,5],[2,6])=[2,5], c=[3,7] =>
union=[2,7]
+ # OR(AND(a,c), b): AND(a,c)=intersect([1,5],[3,7])=[3,5], b=[2,6] =>
union=[2,6]
+ # top AND: intersect([2,7],[2,6]) = [2,6]
+ self.assertEqual(bm.cardinality(), 5)
+ for v in [2, 3, 4, 5, 6]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_same_field_predicates_not_accessed_concurrently(self):
+ import threading
+ import time
+
+ fields = _make_fields()
+
+ concurrency = [0]
+ max_concurrency = [0]
+ lock = threading.Lock()
+
+ class ConcurrencyDetectingReader(GlobalIndexReader):
+ def __init__(self, result):
+ self._result = result
+
+ def visit_equal(self, field_ref, literal):
+ with lock:
+ concurrency[0] += 1
+ max_concurrency[0] = max(max_concurrency[0],
concurrency[0])
+ time.sleep(0.05)
+ with lock:
+ concurrency[0] -= 1
+ return self._result
+
+ def close(self):
+ pass
+
+ result_a = GlobalIndexResult.from_range(Range(1, 5))
+ reader = ConcurrencyDetectingReader(result_a)
+
+ executor = ThreadPoolExecutor(max_workers=4)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [reader],
+ executor,
+ )
+
+ # AND(a=1, a=2, a=3) — all same field, must not run concurrently
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a', literals=[1]),
+ Predicate(method='equal', index=0, field='a', literals=[2]),
+ Predicate(method='equal', index=0, field='a', literals=[3]),
+ ],
+ )
+
+ evaluator.evaluate(predicate)
+
+ self.assertEqual(max_concurrency[0], 1)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_mixed_nested_same_field_not_accessed_concurrently(self):
+ import threading
+ import time
+
+ fields = _make_fields()
+
+ concurrency_a = [0]
+ max_concurrency_a = [0]
+ lock = threading.Lock()
+
+ class ConcurrencyDetectingReader(GlobalIndexReader):
+ def __init__(self, result):
+ self._result = result
+
+ def visit_equal(self, field_ref, literal):
+ with lock:
+ concurrency_a[0] += 1
+ max_concurrency_a[0] = max(max_concurrency_a[0],
concurrency_a[0])
+ time.sleep(0.05)
+ with lock:
+ concurrency_a[0] -= 1
+ return self._result
+
+ def close(self):
+ pass
+
+ result_all = GlobalIndexResult.from_range(Range(1, 5))
+ detecting_reader = ConcurrencyDetectingReader(result_all)
+ normal_reader = StubGlobalIndexReader(result_all)
+
+ def readers_fn(field):
+ if field.id == 0:
+ return [detecting_reader]
+ return [normal_reader]
+
+ executor = ThreadPoolExecutor(max_workers=4)
+ evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+ # AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[3]),
+ Predicate(method='equal', index=2, field='c',
literals=[4]),
+ ],
+ ),
+ ],
+ )
+
+ evaluator.evaluate(predicate)
+
+ self.assertEqual(max_concurrency_a[0], 1)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_lazy_result_not_materialized_concurrently(self):
+ import threading
+ import time
+
+ fields = _make_fields()
+
+ concurrency = [0]
+ max_concurrency = [0]
+ lock = threading.Lock()
+
+ class LazyIOReader(GlobalIndexReader):
+ def visit_equal(self, field_ref, literal):
+ def supplier():
+ with lock:
+ concurrency[0] += 1
+ max_concurrency[0] = max(max_concurrency[0],
concurrency[0])
+ time.sleep(0.05)
+ with lock:
+ concurrency[0] -= 1
+ return GlobalIndexResult.from_range(Range(1, 3)).results()
+
+ return GlobalIndexResult.create(supplier)
+
+ def close(self):
+ pass
+
+ lazy_reader = LazyIOReader()
+
+ def readers_fn(field):
+ if field.id == 0:
+ return [lazy_reader]
+ return
[StubGlobalIndexReader(GlobalIndexResult.from_range(Range(1, 5)))]
+
+ executor = ThreadPoolExecutor(max_workers=4)
+ evaluator = GlobalIndexEvaluator(fields, readers_fn, executor)
+
+ # AND(OR(a=1, b=2), OR(a=3, c=4)) — field a in both OR subtrees
+ # lazy results for field a must not be materialized concurrently
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[1]),
+ Predicate(method='equal', index=1, field='b',
literals=[2]),
+ ],
+ ),
+ Predicate(
+ method='or', index=None, field=None,
+ literals=[
+ Predicate(method='equal', index=0, field='a',
literals=[3]),
+ Predicate(method='equal', index=2, field='c',
literals=[4]),
+ ],
+ ),
+ ],
+ )
+
+ evaluator.evaluate(predicate)
+
+ self.assertEqual(max_concurrency[0], 1)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_multiple_readers_per_field_combined_with_and(self):
+ fields = _make_fields()
+ reader_result1 = GlobalIndexResult.from_range(Range(1, 5))
+ reader_result2 = GlobalIndexResult.from_range(Range(3, 7))
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [
+ StubGlobalIndexReader(reader_result1),
+ StubGlobalIndexReader(reader_result2),
+ ],
+ executor,
+ )
+
+ predicate = Predicate(method='equal', index=0, field='a',
literals=[42])
+ result = evaluator.evaluate(predicate)
+
+ self.assertIsNotNone(result)
+ bm = result.results()
+ # Multiple readers for same field are combined with AND (intersection)
+ self.assertEqual(bm.cardinality(), 3)
+ for v in [3, 4, 5]:
+ self.assertTrue(bm.contains(v))
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_non_field_leaf_predicate_does_not_throw(self):
+ fields = _make_fields()
+ result_a = GlobalIndexResult.from_range(Range(1, 3))
+
+ executor = ThreadPoolExecutor(max_workers=2)
+ evaluator = GlobalIndexEvaluator(
+ fields,
+ lambda field: [StubGlobalIndexReader(result_a)],
+ executor,
+ )
+
+ # AND(non-field leaf, a=1) — non-field leaf has field=None
+ predicate = Predicate(
+ method='and', index=None, field=None,
+ literals=[
+ Predicate(method='alwaysTrue', index=None, field=None,
literals=[]),
+ Predicate(method='equal', index=0, field='a', literals=[42]),
+ ],
+ )
+
+ result = evaluator.evaluate(predicate)
+
+ # alwaysTrue has no reader support, returns None — AND ignores it
+ self.assertIsNotNone(result)
+ self.assertEqual(result.results().cardinality(), 3)
+ evaluator.close()
+ executor.shutdown(wait=False)
+
+ def test_null_predicate(self):
+ fields = _make_fields()
+ evaluator = GlobalIndexEvaluator(fields, lambda field: [])
+
+ result = evaluator.evaluate(None)
+
+ self.assertIsNone(result)
+ evaluator.close()
+
+
+if __name__ == '__main__':
+ unittest.main()