jerry-024 commented on code in PR #7951:
URL: https://github.com/apache/paimon/pull/7951#discussion_r3297188532


##########
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java:
##########
@@ -23,80 +23,139 @@
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
+import static 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+
 /** Predicate for filtering data using global indexes. */
-public class GlobalIndexEvaluator
-        implements Closeable, PredicateVisitor<Optional<GlobalIndexResult>> {
+public class GlobalIndexEvaluator implements Closeable {
 
     private final RowType rowType;
     private final IntFunction<Collection<GlobalIndexReader>> readersFunction;
-    private final Map<Integer, Collection<GlobalIndexReader>> 
indexReadersCache = new HashMap<>();
+    private final Map<Integer, Collection<GlobalIndexReader>> 
indexReadersCache;
+    private final ExecutorService executorService;
 
     public GlobalIndexEvaluator(
-            RowType rowType, IntFunction<Collection<GlobalIndexReader>> 
readersFunction) {
+            RowType rowType,
+            IntFunction<Collection<GlobalIndexReader>> readersFunction,
+            @Nullable ExecutorService executorService) {
         this.rowType = rowType;
         this.readersFunction = readersFunction;
+        this.executorService =
+                executorService == null ? newDirectExecutorService() : 
executorService;
+        this.indexReadersCache = new ConcurrentHashMap<>();
     }
 
     public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate) 
{
-        return predicate == null ? Optional.empty() : predicate.visit(this);
+        if (predicate == null) {
+            return Optional.empty();
+        }
+        try {
+            return visitAsync(predicate).get();
+        } catch (Exception e) {
+            if (e.getCause() instanceof RuntimeException) {
+                throw (RuntimeException) e.getCause();
+            }
+            if (e.getCause() instanceof Error) {
+                throw (Error) e.getCause();
+            }
+            throw new RuntimeException(e.getCause() != null ? e.getCause() : 
e);
+        }

Review Comment:
   **[HIGH] InterruptedException swallowed — interrupt flag not restored**
   
   `CompletableFuture.get()` throws `InterruptedException` when the calling 
thread is interrupted. The current `catch (Exception e)` swallows it and wraps 
in RuntimeException without restoring the interrupt flag. This breaks upstream 
cancel/shutdown semantics.
   
   Suggest:
   ```java
   } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw new RuntimeException("Interrupted during index evaluation", e);
   } catch (ExecutionException e) {
       if (e.getCause() instanceof RuntimeException) {
           throw (RuntimeException) e.getCause();
       }
       if (e.getCause() instanceof Error) {
           throw (Error) e.getCause();
       }
       throw new RuntimeException(e.getCause());
   }
   ```
   
   This is not a style issue — it affects upper-layer cancellation and shutdown 
semantics.



##########
paimon-common/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java:
##########
@@ -128,6 +180,22 @@ public Optional<GlobalIndexResult> visit(CompoundPredicate 
predicate) {
         }
     }
 
+    private List<Predicate> flattenChildren(CompoundPredicate predicate) {
+        List<Predicate> result = new ArrayList<>();
+        for (Predicate child : predicate.children()) {
+            if (child instanceof CompoundPredicate) {
+                CompoundPredicate compound = (CompoundPredicate) child;
+                if (compound.function().equals(predicate.function())) {
+                    result.addAll(flattenChildren(compound));
+                    continue;
+                }
+            }
+            result.add(child);
+        }
+        return result;

Review Comment:
   **[HIGH] Unbounded recursion depth — not just flattenChildren**
   
   `flattenChildren()` recurses on same-type nesting (`AND(AND(AND(...)))`) and 
can overflow the stack with deeply nested predicates from crafted SQL.
   
   However, mixed-type nesting (`AND(OR(AND(OR(...))))`) also recurses 
unboundedly through the `visitAsync() → visitCompoundAsync() → visitAsync()` 
path. Converting only `flattenChildren` to iterative fixes the same-type case, 
but for complete protection a depth limit or fully iterative approach is needed 
across both paths.
   
   Suggest:
   1. Convert `flattenChildren` to iterative (Deque-based)
   2. Add a global depth guard (e.g., `MAX_PREDICATE_DEPTH = 1000`) that throws 
on exceeding — covering both flatten and visitAsync recursion



##########
paimon-python/pypaimon/globalindex/global_index_evaluator.py:
##########
@@ -25,108 +27,198 @@
 from pypaimon.schema.data_types import DataField
 
 
+class _DirectExecutor(Executor):
+    """Executor that runs callables in the calling thread."""
+
+    def submit(self, fn, *args, **kwargs):
+        f = Future()
+        try:
+            result = fn(*args, **kwargs)
+            f.set_result(result)
+        except Exception as e:
+            f.set_exception(e)
+        return f
+
+    def shutdown(self, wait=True):
+        pass
+
+
 class GlobalIndexEvaluator:
-    """
-    Predicate evaluator for filtering data using global indexes.
-    """
+    """Predicate evaluator for filtering data using global indexes."""
 
     def __init__(
         self,
         fields: List[DataField],
-        readers_function: Callable[[DataField], Collection[GlobalIndexReader]]
+        readers_function: Callable[[DataField], Collection[GlobalIndexReader]],
+        executor: Optional[Executor] = None,
     ):
         self._fields = fields
         self._field_by_name = {f.name: f for f in fields}
         self._readers_function = readers_function
         self._index_readers_cache: Dict[int, Collection[GlobalIndexReader]] = 
{}
+        self._reader_locks: Dict[int, threading.Lock] = {}
+        self._locks_lock = threading.Lock()
+        self._executor = executor if executor is not None else 
_DirectExecutor()
 
     def evaluate(
         self,
         predicate: Optional[Predicate]
     ) -> Optional[GlobalIndexResult]:
-        compound_result: Optional[GlobalIndexResult] = None
-        
-        # Evaluate predicate first
-        if predicate is not None:
-            compound_result = self._visit_predicate(predicate)
-        
-        return compound_result
+        if predicate is None:
+            return None
+        future = self._visit_async(predicate)
+        return future.result()
 
-    def _visit_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
-        """Visit a predicate and return the index result."""
-        if predicate.method == 'and':
-            compound_result: Optional[GlobalIndexResult] = None
-            for child in predicate.literals:
-                child_result = self._visit_predicate(child)
-                
-                if child_result is not None:
-                    if compound_result is not None:
-                        compound_result = compound_result.and_(child_result)
-                    else:
-                        compound_result = child_result
-                
-                if compound_result is not None and compound_result.is_empty():
-                    return compound_result
-            
-            return compound_result
-        
-        elif predicate.method == 'or':
-            compound_result = GlobalIndexResult.create_empty()
-            for child in predicate.literals:
-                child_result = self._visit_predicate(child)
-                
-                if child_result is None:
-                    return None
-                
-                compound_result = compound_result.or_(child_result)
-            
-            return compound_result
-        
-        else:
-            # Leaf predicate
-            return self._visit_leaf_predicate(predicate)
+    def _visit_async(self, predicate) -> Future:
+        if isinstance(predicate, Predicate) and predicate.method in ('and', 
'or'):
+            return self._visit_compound_async(predicate)
+        return self._visit_leaf_async(predicate)
 
-    def _visit_leaf_predicate(self, predicate: Predicate) -> 
Optional[GlobalIndexResult]:
-        """Visit a leaf predicate and return the index result."""
+    def _visit_leaf_async(self, predicate: Predicate) -> Future:
         field = self._field_by_name.get(predicate.field)
         if field is None:
-            return None
-        
+            f = Future()
+            f.set_result(None)
+            return f
+
         field_id = field.id
         readers = self._index_readers_cache.get(field_id)
         if readers is None:
             readers = self._readers_function(field)
             self._index_readers_cache[field_id] = readers
-        
+
         field_ref = FieldRef(predicate.index, predicate.field, str(field.type))
-        
-        compound_result: Optional[GlobalIndexResult] = None
-        
+
+        reader_futures = []
         for reader in readers:
-            child_result = self._visit_function(reader, predicate, field_ref)
+            lock = self._get_reader_lock(id(reader))
+            reader_futures.append(
+                self._executor.submit(
+                    self._visit_reader, reader, predicate, field_ref, lock
+                )
+            )
+
+        all_done = Future()
+        if not reader_futures:
+            all_done.set_result(None)
+            return all_done
+
+        remaining = [len(reader_futures)]
+        count_lock = threading.Lock()
+
+        def on_done(_):
+            with count_lock:
+                remaining[0] -= 1
+                if remaining[0] == 0:
+                    try:
+                        all_done.set_result(
+                            self._combine_reader_results(reader_futures)
+                        )
+                    except Exception as e:
+                        all_done.set_exception(e)
+
+        for rf in reader_futures:
+            rf.add_done_callback(on_done)
+
+        return all_done
+
+    def _visit_reader(self, reader, predicate, field_ref, lock):
+        with lock:
+            return self._visit_function(reader, predicate, field_ref)
+
+    def _combine_reader_results(
+        self, reader_futures: List[Future]
+    ) -> Optional[GlobalIndexResult]:
+        compound_result: Optional[GlobalIndexResult] = None
+        for f in reader_futures:
+            child_result = f.result()
             if child_result is None:
                 continue
-            
             if compound_result is not None:
-                compound_result = compound_result.or_(child_result)
+                compound_result = compound_result.and_(child_result)
             else:
                 compound_result = child_result
-            
             if compound_result.is_empty():
                 return compound_result
-        
         return compound_result

Review Comment:
   **[HIGH] Undocumented semantic change: `or_` → `and_` for multi-reader leaf 
combination**
   
   The base Python code combined multiple readers of the same field using `or_` 
(union). This PR changes it to `and_` (intersection), which matches the Java 
side (Java base always used `and`).
   
   This looks like a fix for a pre-existing Python/Java inconsistency, but the 
PR description only mentions parallelization — it doesn't call out this 
semantic correction. Please:
   1. Explicitly note this as a bug fix in the PR description
   2. Add a multi-reader test that verifies the AND semantics (multiple readers 
per field returning overlapping results → intersection expected)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to