This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new e7fbdc8f6c [hotfix] Refactor ShardGlobalIndexScanner and 
GlobalIndexEvaluator
e7fbdc8f6c is described below

commit e7fbdc8f6c259faff560894fe30d3b22d4fef7af
Author: JingsongLi <[email protected]>
AuthorDate: Mon Nov 24 17:41:35 2025 +0800

    [hotfix] Refactor ShardGlobalIndexScanner and GlobalIndexEvaluator
---
 .../paimon/globalindex/GlobalIndexEvaluator.java   | 186 +++++++--------------
 .../globalindex/ShardGlobalIndexScanner.java       | 159 ++++++++----------
 2 files changed, 129 insertions(+), 216 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 32c551b337..36e22fd144 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -25,125 +25,94 @@ import org.apache.paimon.predicate.Or;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateVisitor;
 import org.apache.paimon.predicate.TransformPredicate;
-import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.utils.IOUtils;
 
 import javax.annotation.Nullable;
 
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 /** Predicate for filtering data using global indexes. */
-public class GlobalIndexEvaluator implements Closeable {
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(GlobalIndexEvaluator.class);
+public class GlobalIndexEvaluator
+        implements Closeable, PredicateVisitor<Optional<GlobalIndexResult>> {
 
-    private final Function<Integer, Collection<GlobalIndexReader>> 
readersFunction;
-    private final Map<String, DataField> fieldNameToField;
+    private final RowType rowType;
+    private final IntFunction<Collection<GlobalIndexReader>> readersFunction;
     private final Map<Integer, Collection<GlobalIndexReader>> 
indexReadersCache = new HashMap<>();
 
     public GlobalIndexEvaluator(
-            Function<Integer, Collection<GlobalIndexReader>> readersFunction, 
RowType rowType) {
+            RowType rowType, IntFunction<Collection<GlobalIndexReader>> 
readersFunction) {
+        this.rowType = rowType;
         this.readersFunction = readersFunction;
-        this.fieldNameToField = new HashMap<>();
-        for (DataField dataField : rowType.getFields()) {
-            fieldNameToField.put(dataField.name(), dataField);
-        }
     }
 
     public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate) 
{
         if (predicate == null) {
             return Optional.empty();
         }
-        Set<Integer> requiredFieldIds = getRequiredFieldIds(predicate);
-        requiredFieldIds.forEach(id -> indexReadersCache.computeIfAbsent(id, 
readersFunction));
-        return new FileIndexPredicateTest(fieldNameToField, 
indexReadersCache).test(predicate);
+        return predicate.visit(this);
     }
 
     public void close() {
-        closePredicatorsQuietly(
-                
indexReadersCache.values().stream().flatMap(Collection::stream).iterator());
+        IOUtils.closeAllQuietly(
+                indexReadersCache.values().stream()
+                        .flatMap(Collection::stream)
+                        .collect(Collectors.toList()));
     }
 
-    private void closePredicatorsQuietly(Iterator<GlobalIndexReader> iterator) 
{
-        while (iterator.hasNext()) {
-            try {
-                iterator.next().close();
-            } catch (IOException e) {
-                LOG.warn("Failed to close GlobalIndexLeafPredicator", e);
+    @Override
+    public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+        Optional<GlobalIndexResult> compoundResult = Optional.empty();
+        FieldRef fieldRef =
+                new FieldRef(predicate.index(), predicate.fieldName(), 
predicate.type());
+        int fieldId = rowType.getField(predicate.fieldName()).id();
+        Collection<GlobalIndexReader> readers =
+                indexReadersCache.computeIfAbsent(fieldId, 
readersFunction::apply);
+        for (GlobalIndexReader fileIndexReader : readers) {
+            Optional<GlobalIndexResult> childResult =
+                    predicate.function().visit(fileIndexReader, fieldRef, 
predicate.literals());
+
+            // AND Operation
+            if (childResult.isPresent()) {
+                if (compoundResult.isPresent()) {
+                    GlobalIndexResult r1 = compoundResult.get();
+                    GlobalIndexResult r2 = childResult.get();
+                    compoundResult = Optional.of(r1.and(r2));
+                } else {
+                    compoundResult = childResult;
+                }
             }
-        }
-    }
-
-    private Set<Integer> getRequiredFieldIds(Predicate filePredicate) {
-        return filePredicate.visit(
-                new PredicateVisitor<Set<Integer>>() {
-
-                    @Override
-                    public Set<Integer> visit(LeafPredicate predicate) {
-                        return Collections.singleton(
-                                
fieldNameToField.get(predicate.fieldName()).id());
-                    }
 
-                    @Override
-                    public Set<Integer> visit(CompoundPredicate predicate) {
-                        Set<Integer> result = new HashSet<>();
-                        for (Predicate child : predicate.children()) {
-                            child.visit(this);
-                            result.addAll(child.visit(this));
-                        }
-                        return result;
-                    }
-
-                    @Override
-                    public Set<Integer> visit(TransformPredicate predicate) {
-                        return predicate.fieldNames().stream()
-                                .map(f -> fieldNameToField.get(f).id())
-                                .collect(Collectors.toSet());
-                    }
-                });
-    }
-
-    /** Predicate test worker. */
-    private static class FileIndexPredicateTest
-            implements PredicateVisitor<Optional<GlobalIndexResult>> {
-
-        private final Map<String, DataField> fieldColumnToId;
-        private final Map<Integer, Collection<GlobalIndexReader>> 
columnIndexReaders;
-
-        public FileIndexPredicateTest(
-                Map<String, DataField> fieldColumnToId,
-                Map<Integer, Collection<GlobalIndexReader>> fileIndexReaders) {
-            this.fieldColumnToId = fieldColumnToId;
-            this.columnIndexReaders = fileIndexReaders;
+            if (compoundResult.isPresent() && 
!compoundResult.get().iterator().hasNext()) {
+                return compoundResult;
+            }
         }
+        return compoundResult;
+    }
 
-        public Optional<GlobalIndexResult> test(Predicate predicate) {
-            return predicate.visit(this);
-        }
+    @Override
+    public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
+        if (predicate.function() instanceof Or) {
+            GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
+            for (Predicate predicate1 : predicate.children()) {
+                Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
 
-        @Override
-        public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+                if (!childResult.isPresent()) {
+                    return Optional.empty();
+                }
+                compoundResult = compoundResult.or(childResult.get());
+            }
+            return Optional.of(compoundResult);
+        } else {
             Optional<GlobalIndexResult> compoundResult = Optional.empty();
-            FieldRef fieldRef =
-                    new FieldRef(predicate.index(), predicate.fieldName(), 
predicate.type());
-            for (GlobalIndexReader fileIndexReader :
-                    
columnIndexReaders.get(fieldColumnToId.get(predicate.fieldName()).id())) {
-                Optional<GlobalIndexResult> childResult =
-                        predicate.function().visit(fileIndexReader, fieldRef, 
predicate.literals());
+            for (Predicate predicate1 : predicate.children()) {
+                Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
 
                 // AND Operation
                 if (childResult.isPresent()) {
@@ -156,54 +125,17 @@ public class GlobalIndexEvaluator implements Closeable {
                     }
                 }
 
+                // if not remain, no need to test anymore
                 if (compoundResult.isPresent() && 
!compoundResult.get().iterator().hasNext()) {
                     return compoundResult;
                 }
             }
             return compoundResult;
         }
+    }
 
-        @Override
-        public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
-            if (predicate.function() instanceof Or) {
-                GlobalIndexResult compoundResult = 
GlobalIndexResult.createEmpty();
-                for (Predicate predicate1 : predicate.children()) {
-                    Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
-
-                    if (!childResult.isPresent()) {
-                        return Optional.empty();
-                    }
-                    compoundResult = compoundResult.or(childResult.get());
-                }
-                return Optional.of(compoundResult);
-            } else {
-                Optional<GlobalIndexResult> compoundResult = Optional.empty();
-                for (Predicate predicate1 : predicate.children()) {
-                    Optional<GlobalIndexResult> childResult = 
predicate1.visit(this);
-
-                    // AND Operation
-                    if (childResult.isPresent()) {
-                        if (compoundResult.isPresent()) {
-                            GlobalIndexResult r1 = compoundResult.get();
-                            GlobalIndexResult r2 = childResult.get();
-                            compoundResult = Optional.of(r1.and(r2));
-                        } else {
-                            compoundResult = childResult;
-                        }
-                    }
-
-                    // if not remain, no need to test anymore
-                    if (compoundResult.isPresent() && 
!compoundResult.get().iterator().hasNext()) {
-                        return compoundResult;
-                    }
-                }
-                return compoundResult;
-            }
-        }
-
-        @Override
-        public Optional<GlobalIndexResult> visit(TransformPredicate predicate) 
{
-            return Optional.empty();
-        }
+    @Override
+    public Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
+        return Optional.empty();
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
index eff0c1dded..ab455de7fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
@@ -26,7 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Range;
 
@@ -41,20 +41,17 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntFunction;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** Scanner for shard-based global indexes. */
 public class ShardGlobalIndexScanner implements Closeable {
 
-    private final FileStoreTable fileStoreTable;
-    private final GlobalIndexFileReadWrite globalIndexFileInputHelper;
-    private final Map<Integer, Map<String, List<IndexFileMeta>>> indexMetas;
-    private final Map<Integer, DataField> columnIdToDataField;
-
-    private GlobalIndexEvaluator globalIndexEvaluator;
+    private final Options options;
+    private final GlobalIndexEvaluator globalIndexEvaluator;
 
     public ShardGlobalIndexScanner(
             FileStoreTable fileStoreTable,
@@ -62,18 +59,30 @@ public class ShardGlobalIndexScanner implements Closeable {
             long rowRangeStart,
             long rowRangeEnd,
             List<IndexManifestEntry> entries) {
-        this.fileStoreTable = fileStoreTable;
+        this.options = fileStoreTable.coreOptions().toConfiguration();
+        for (IndexManifestEntry entry : entries) {
+            GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
+            checkArgument(
+                    meta != null
+                            && Range.intersect(
+                                    rowRangeStart,
+                                    rowRangeEnd,
+                                    meta.rowRangeStart(),
+                                    meta.rowRangeEnd()),
+                    "All index files must have an intersection with row range 
["
+                            + rowRangeStart
+                            + ", "
+                            + rowRangeEnd
+                            + ")");
+        }
+
         FileIO fileIO = fileStoreTable.fileIO();
-        this.globalIndexFileInputHelper =
+        GlobalIndexFileReadWrite indexFileReadWrite =
                 new GlobalIndexFileReadWrite(
                         fileIO,
                         
fileStoreTable.store().pathFactory().indexFileFactory(partition, 0));
-        RowType rowType = fileStoreTable.rowType();
-        this.columnIdToDataField = new HashMap<>();
-        for (DataField dataField : rowType.getFields()) {
-            columnIdToDataField.put(dataField.id(), dataField);
-        }
-        this.indexMetas = new HashMap<>();
+
+        Map<Integer, Map<String, List<IndexFileMeta>>> indexMetas = new 
HashMap<>();
         for (IndexManifestEntry entry : entries) {
             GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
             checkArgument(meta != null, "Global index meta must not be null");
@@ -85,88 +94,60 @@ public class ShardGlobalIndexScanner implements Closeable {
                     .add(entry.indexFile());
         }
 
-        checkArgument(
-                entries.stream()
-                        .allMatch(
-                                m ->
-                                        m.indexFile().globalIndexMeta() != null
-                                                && Range.intersect(
-                                                        rowRangeStart,
-                                                        rowRangeEnd,
-                                                        m.indexFile()
-                                                                
.globalIndexMeta()
-                                                                
.rowRangeStart(),
-                                                        m.indexFile()
-                                                                
.globalIndexMeta()
-                                                                
.rowRangeEnd())),
-                "All index files must have an intersection with row range ["
-                        + rowRangeStart
-                        + ", "
-                        + rowRangeEnd
-                        + ")");
+        RowType rowType = fileStoreTable.rowType();
+
+        IntFunction<Collection<GlobalIndexReader>> readersFunction =
+                fieldId ->
+                        createReaders(
+                                indexFileReadWrite,
+                                indexMetas.get(fieldId),
+                                rowType.getField(fieldId).type());
+        this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType, 
readersFunction);
     }
 
     public Optional<GlobalIndexResult> scan(Predicate predicate) {
-        GlobalIndexEvaluator globalIndexPredicate = getGlobalIndexEvaluator();
-        return globalIndexPredicate.evaluate(predicate);
+        return globalIndexEvaluator.evaluate(predicate);
     }
 
-    private GlobalIndexEvaluator getGlobalIndexEvaluator() {
-        if (globalIndexEvaluator == null) {
-            Function<Integer, Collection<GlobalIndexReader>> readerFunction =
-                    fieldId -> {
-                        if (!indexMetas.containsKey(fieldId)) {
-                            return Collections.emptyList();
-                        }
-                        Map<String, List<IndexFileMeta>> indexMetas = 
this.indexMetas.get(fieldId);
-
-                        Set<GlobalIndexReader> readers = new HashSet<>();
-                        try {
-                            for (Map.Entry<String, List<IndexFileMeta>> entry :
-                                    indexMetas.entrySet()) {
-                                String indexType = entry.getKey();
-                                List<IndexFileMeta> metas = entry.getValue();
-                                GlobalIndexerFactory globalIndexerFactory =
-                                        
GlobalIndexerFactoryUtils.load(indexType);
-                                GlobalIndexer globalIndexer =
-                                        globalIndexerFactory.create(
-                                                
columnIdToDataField.get(fieldId).type(),
-                                                new 
Options(fileStoreTable.options()));
-                                readers.add(
-                                        globalIndexer.createReader(
-                                                globalIndexFileInputHelper,
-                                                metas.stream()
-                                                        .map(
-                                                                meta ->
-                                                                        new 
GlobalIndexIOMeta(
-                                                                               
 meta.fileName(),
-                                                                               
 meta.fileSize(),
-                                                                               
 new Range(
-                                                                               
         meta.globalIndexMeta()
-                                                                               
                 .rowRangeStart(),
-                                                                               
         meta.globalIndexMeta()
-                                                                               
                 .rowRangeEnd()),
-                                                                               
 meta.globalIndexMeta()
-                                                                               
         .indexMeta()))
-                                                        
.collect(Collectors.toList())));
-                            }
-                        } catch (IOException e) {
-                            throw new RuntimeException("Failed to create 
global index reader", e);
-                        }
-
-                        return readers;
-                    };
-
-            globalIndexEvaluator =
-                    new GlobalIndexEvaluator(readerFunction, 
fileStoreTable.rowType());
+    private Collection<GlobalIndexReader> createReaders(
+            GlobalIndexFileReadWrite indexFileReadWrite,
+            Map<String, List<IndexFileMeta>> indexMetas,
+            DataType fieldType) {
+        if (indexMetas == null) {
+            return Collections.emptyList();
+        }
+
+        Set<GlobalIndexReader> readers = new HashSet<>();
+        try {
+            for (Map.Entry<String, List<IndexFileMeta>> entry : 
indexMetas.entrySet()) {
+                String indexType = entry.getKey();
+                List<IndexFileMeta> metas = entry.getValue();
+                GlobalIndexerFactory globalIndexerFactory =
+                        GlobalIndexerFactoryUtils.load(indexType);
+                GlobalIndexer globalIndexer = 
globalIndexerFactory.create(fieldType, options);
+                List<GlobalIndexIOMeta> globalMetas =
+                        
metas.stream().map(this::toGlobalMeta).collect(Collectors.toList());
+                readers.add(globalIndexer.createReader(indexFileReadWrite, 
globalMetas));
+            }
+        } catch (IOException e) {
+            throw new RuntimeException("Failed to create global index reader", 
e);
         }
-        return globalIndexEvaluator;
+
+        return readers;
+    }
+
+    private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
+        GlobalIndexMeta globalIndex = meta.globalIndexMeta();
+        checkNotNull(globalIndex);
+        return new GlobalIndexIOMeta(
+                meta.fileName(),
+                meta.fileSize(),
+                new Range(globalIndex.rowRangeStart(), 
globalIndex.rowRangeEnd()),
+                globalIndex.indexMeta());
     }
 
     @Override
     public void close() throws IOException {
-        if (globalIndexEvaluator != null) {
-            globalIndexEvaluator.close();
-        }
+        globalIndexEvaluator.close();
     }
 }

Reply via email to