This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new e7fbdc8f6c [hotfix] Refactor ShardGlobalIndexScanner and
GlobalIndexEvaluator
e7fbdc8f6c is described below
commit e7fbdc8f6c259faff560894fe30d3b22d4fef7af
Author: JingsongLi <[email protected]>
AuthorDate: Mon Nov 24 17:41:35 2025 +0800
[hotfix] Refactor ShardGlobalIndexScanner and GlobalIndexEvaluator
---
.../paimon/globalindex/GlobalIndexEvaluator.java | 186 +++++++--------------
.../globalindex/ShardGlobalIndexScanner.java | 159 ++++++++----------
2 files changed, 129 insertions(+), 216 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
index 32c551b337..36e22fd144 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexEvaluator.java
@@ -25,125 +25,94 @@ import org.apache.paimon.predicate.Or;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateVisitor;
import org.apache.paimon.predicate.TransformPredicate;
-import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.paimon.utils.IOUtils;
import javax.annotation.Nullable;
import java.io.Closeable;
-import java.io.IOException;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.stream.Collectors;
/** Predicate for filtering data using global indexes. */
-public class GlobalIndexEvaluator implements Closeable {
-
- private static final Logger LOG =
LoggerFactory.getLogger(GlobalIndexEvaluator.class);
+public class GlobalIndexEvaluator
+ implements Closeable, PredicateVisitor<Optional<GlobalIndexResult>> {
- private final Function<Integer, Collection<GlobalIndexReader>>
readersFunction;
- private final Map<String, DataField> fieldNameToField;
+ private final RowType rowType;
+ private final IntFunction<Collection<GlobalIndexReader>> readersFunction;
private final Map<Integer, Collection<GlobalIndexReader>>
indexReadersCache = new HashMap<>();
public GlobalIndexEvaluator(
- Function<Integer, Collection<GlobalIndexReader>> readersFunction,
RowType rowType) {
+ RowType rowType, IntFunction<Collection<GlobalIndexReader>>
readersFunction) {
+ this.rowType = rowType;
this.readersFunction = readersFunction;
- this.fieldNameToField = new HashMap<>();
- for (DataField dataField : rowType.getFields()) {
- fieldNameToField.put(dataField.name(), dataField);
- }
}
public Optional<GlobalIndexResult> evaluate(@Nullable Predicate predicate)
{
if (predicate == null) {
return Optional.empty();
}
- Set<Integer> requiredFieldIds = getRequiredFieldIds(predicate);
- requiredFieldIds.forEach(id -> indexReadersCache.computeIfAbsent(id,
readersFunction));
- return new FileIndexPredicateTest(fieldNameToField,
indexReadersCache).test(predicate);
+ return predicate.visit(this);
}
public void close() {
- closePredicatorsQuietly(
-
indexReadersCache.values().stream().flatMap(Collection::stream).iterator());
+ IOUtils.closeAllQuietly(
+ indexReadersCache.values().stream()
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList()));
}
- private void closePredicatorsQuietly(Iterator<GlobalIndexReader> iterator)
{
- while (iterator.hasNext()) {
- try {
- iterator.next().close();
- } catch (IOException e) {
- LOG.warn("Failed to close GlobalIndexLeafPredicator", e);
+ @Override
+ public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+ Optional<GlobalIndexResult> compoundResult = Optional.empty();
+ FieldRef fieldRef =
+ new FieldRef(predicate.index(), predicate.fieldName(),
predicate.type());
+ int fieldId = rowType.getField(predicate.fieldName()).id();
+ Collection<GlobalIndexReader> readers =
+ indexReadersCache.computeIfAbsent(fieldId,
readersFunction::apply);
+ for (GlobalIndexReader fileIndexReader : readers) {
+ Optional<GlobalIndexResult> childResult =
+ predicate.function().visit(fileIndexReader, fieldRef,
predicate.literals());
+
+ // AND Operation
+ if (childResult.isPresent()) {
+ if (compoundResult.isPresent()) {
+ GlobalIndexResult r1 = compoundResult.get();
+ GlobalIndexResult r2 = childResult.get();
+ compoundResult = Optional.of(r1.and(r2));
+ } else {
+ compoundResult = childResult;
+ }
}
- }
- }
-
- private Set<Integer> getRequiredFieldIds(Predicate filePredicate) {
- return filePredicate.visit(
- new PredicateVisitor<Set<Integer>>() {
-
- @Override
- public Set<Integer> visit(LeafPredicate predicate) {
- return Collections.singleton(
-
fieldNameToField.get(predicate.fieldName()).id());
- }
- @Override
- public Set<Integer> visit(CompoundPredicate predicate) {
- Set<Integer> result = new HashSet<>();
- for (Predicate child : predicate.children()) {
- child.visit(this);
- result.addAll(child.visit(this));
- }
- return result;
- }
-
- @Override
- public Set<Integer> visit(TransformPredicate predicate) {
- return predicate.fieldNames().stream()
- .map(f -> fieldNameToField.get(f).id())
- .collect(Collectors.toSet());
- }
- });
- }
-
- /** Predicate test worker. */
- private static class FileIndexPredicateTest
- implements PredicateVisitor<Optional<GlobalIndexResult>> {
-
- private final Map<String, DataField> fieldColumnToId;
- private final Map<Integer, Collection<GlobalIndexReader>>
columnIndexReaders;
-
- public FileIndexPredicateTest(
- Map<String, DataField> fieldColumnToId,
- Map<Integer, Collection<GlobalIndexReader>> fileIndexReaders) {
- this.fieldColumnToId = fieldColumnToId;
- this.columnIndexReaders = fileIndexReaders;
+ if (compoundResult.isPresent() &&
!compoundResult.get().iterator().hasNext()) {
+ return compoundResult;
+ }
}
+ return compoundResult;
+ }
- public Optional<GlobalIndexResult> test(Predicate predicate) {
- return predicate.visit(this);
- }
+ @Override
+ public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
+ if (predicate.function() instanceof Or) {
+ GlobalIndexResult compoundResult = GlobalIndexResult.createEmpty();
+ for (Predicate predicate1 : predicate.children()) {
+ Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
- @Override
- public Optional<GlobalIndexResult> visit(LeafPredicate predicate) {
+ if (!childResult.isPresent()) {
+ return Optional.empty();
+ }
+ compoundResult = compoundResult.or(childResult.get());
+ }
+ return Optional.of(compoundResult);
+ } else {
Optional<GlobalIndexResult> compoundResult = Optional.empty();
- FieldRef fieldRef =
- new FieldRef(predicate.index(), predicate.fieldName(),
predicate.type());
- for (GlobalIndexReader fileIndexReader :
-
columnIndexReaders.get(fieldColumnToId.get(predicate.fieldName()).id())) {
- Optional<GlobalIndexResult> childResult =
- predicate.function().visit(fileIndexReader, fieldRef,
predicate.literals());
+ for (Predicate predicate1 : predicate.children()) {
+ Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
// AND Operation
if (childResult.isPresent()) {
@@ -156,54 +125,17 @@ public class GlobalIndexEvaluator implements Closeable {
}
}
+ // if not remain, no need to test anymore
if (compoundResult.isPresent() &&
!compoundResult.get().iterator().hasNext()) {
return compoundResult;
}
}
return compoundResult;
}
+ }
- @Override
- public Optional<GlobalIndexResult> visit(CompoundPredicate predicate) {
- if (predicate.function() instanceof Or) {
- GlobalIndexResult compoundResult =
GlobalIndexResult.createEmpty();
- for (Predicate predicate1 : predicate.children()) {
- Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
-
- if (!childResult.isPresent()) {
- return Optional.empty();
- }
- compoundResult = compoundResult.or(childResult.get());
- }
- return Optional.of(compoundResult);
- } else {
- Optional<GlobalIndexResult> compoundResult = Optional.empty();
- for (Predicate predicate1 : predicate.children()) {
- Optional<GlobalIndexResult> childResult =
predicate1.visit(this);
-
- // AND Operation
- if (childResult.isPresent()) {
- if (compoundResult.isPresent()) {
- GlobalIndexResult r1 = compoundResult.get();
- GlobalIndexResult r2 = childResult.get();
- compoundResult = Optional.of(r1.and(r2));
- } else {
- compoundResult = childResult;
- }
- }
-
- // if not remain, no need to test anymore
- if (compoundResult.isPresent() &&
!compoundResult.get().iterator().hasNext()) {
- return compoundResult;
- }
- }
- return compoundResult;
- }
- }
-
- @Override
- public Optional<GlobalIndexResult> visit(TransformPredicate predicate)
{
- return Optional.empty();
- }
+ @Override
+ public Optional<GlobalIndexResult> visit(TransformPredicate predicate) {
+ return Optional.empty();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
index eff0c1dded..ab455de7fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/ShardGlobalIndexScanner.java
@@ -26,7 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
@@ -41,20 +41,17 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
-import java.util.function.Function;
+import java.util.function.IntFunction;
import java.util.stream.Collectors;
import static org.apache.paimon.utils.Preconditions.checkArgument;
+import static org.apache.paimon.utils.Preconditions.checkNotNull;
/** Scanner for shard-based global indexes. */
public class ShardGlobalIndexScanner implements Closeable {
- private final FileStoreTable fileStoreTable;
- private final GlobalIndexFileReadWrite globalIndexFileInputHelper;
- private final Map<Integer, Map<String, List<IndexFileMeta>>> indexMetas;
- private final Map<Integer, DataField> columnIdToDataField;
-
- private GlobalIndexEvaluator globalIndexEvaluator;
+ private final Options options;
+ private final GlobalIndexEvaluator globalIndexEvaluator;
public ShardGlobalIndexScanner(
FileStoreTable fileStoreTable,
@@ -62,18 +59,30 @@ public class ShardGlobalIndexScanner implements Closeable {
long rowRangeStart,
long rowRangeEnd,
List<IndexManifestEntry> entries) {
- this.fileStoreTable = fileStoreTable;
+ this.options = fileStoreTable.coreOptions().toConfiguration();
+ for (IndexManifestEntry entry : entries) {
+ GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
+ checkArgument(
+ meta != null
+ && Range.intersect(
+ rowRangeStart,
+ rowRangeEnd,
+ meta.rowRangeStart(),
+ meta.rowRangeEnd()),
+ "All index files must have an intersection with row range
["
+ + rowRangeStart
+ + ", "
+ + rowRangeEnd
+ + ")");
+ }
+
FileIO fileIO = fileStoreTable.fileIO();
- this.globalIndexFileInputHelper =
+ GlobalIndexFileReadWrite indexFileReadWrite =
new GlobalIndexFileReadWrite(
fileIO,
fileStoreTable.store().pathFactory().indexFileFactory(partition, 0));
- RowType rowType = fileStoreTable.rowType();
- this.columnIdToDataField = new HashMap<>();
- for (DataField dataField : rowType.getFields()) {
- columnIdToDataField.put(dataField.id(), dataField);
- }
- this.indexMetas = new HashMap<>();
+
+ Map<Integer, Map<String, List<IndexFileMeta>>> indexMetas = new
HashMap<>();
for (IndexManifestEntry entry : entries) {
GlobalIndexMeta meta = entry.indexFile().globalIndexMeta();
checkArgument(meta != null, "Global index meta must not be null");
@@ -85,88 +94,60 @@ public class ShardGlobalIndexScanner implements Closeable {
.add(entry.indexFile());
}
- checkArgument(
- entries.stream()
- .allMatch(
- m ->
- m.indexFile().globalIndexMeta() != null
- && Range.intersect(
- rowRangeStart,
- rowRangeEnd,
- m.indexFile()
-
.globalIndexMeta()
-
.rowRangeStart(),
- m.indexFile()
-
.globalIndexMeta()
-
.rowRangeEnd())),
- "All index files must have an intersection with row range ["
- + rowRangeStart
- + ", "
- + rowRangeEnd
- + ")");
+ RowType rowType = fileStoreTable.rowType();
+
+ IntFunction<Collection<GlobalIndexReader>> readersFunction =
+ fieldId ->
+ createReaders(
+ indexFileReadWrite,
+ indexMetas.get(fieldId),
+ rowType.getField(fieldId).type());
+ this.globalIndexEvaluator = new GlobalIndexEvaluator(rowType,
readersFunction);
}
public Optional<GlobalIndexResult> scan(Predicate predicate) {
- GlobalIndexEvaluator globalIndexPredicate = getGlobalIndexEvaluator();
- return globalIndexPredicate.evaluate(predicate);
+ return globalIndexEvaluator.evaluate(predicate);
}
- private GlobalIndexEvaluator getGlobalIndexEvaluator() {
- if (globalIndexEvaluator == null) {
- Function<Integer, Collection<GlobalIndexReader>> readerFunction =
- fieldId -> {
- if (!indexMetas.containsKey(fieldId)) {
- return Collections.emptyList();
- }
- Map<String, List<IndexFileMeta>> indexMetas =
this.indexMetas.get(fieldId);
-
- Set<GlobalIndexReader> readers = new HashSet<>();
- try {
- for (Map.Entry<String, List<IndexFileMeta>> entry :
- indexMetas.entrySet()) {
- String indexType = entry.getKey();
- List<IndexFileMeta> metas = entry.getValue();
- GlobalIndexerFactory globalIndexerFactory =
-
GlobalIndexerFactoryUtils.load(indexType);
- GlobalIndexer globalIndexer =
- globalIndexerFactory.create(
-
columnIdToDataField.get(fieldId).type(),
- new
Options(fileStoreTable.options()));
- readers.add(
- globalIndexer.createReader(
- globalIndexFileInputHelper,
- metas.stream()
- .map(
- meta ->
- new
GlobalIndexIOMeta(
-
meta.fileName(),
-
meta.fileSize(),
-
new Range(
-
meta.globalIndexMeta()
-
.rowRangeStart(),
-
meta.globalIndexMeta()
-
.rowRangeEnd()),
-
meta.globalIndexMeta()
-
.indexMeta()))
-
.collect(Collectors.toList())));
- }
- } catch (IOException e) {
- throw new RuntimeException("Failed to create
global index reader", e);
- }
-
- return readers;
- };
-
- globalIndexEvaluator =
- new GlobalIndexEvaluator(readerFunction,
fileStoreTable.rowType());
+ private Collection<GlobalIndexReader> createReaders(
+ GlobalIndexFileReadWrite indexFileReadWrite,
+ Map<String, List<IndexFileMeta>> indexMetas,
+ DataType fieldType) {
+ if (indexMetas == null) {
+ return Collections.emptyList();
+ }
+
+ Set<GlobalIndexReader> readers = new HashSet<>();
+ try {
+ for (Map.Entry<String, List<IndexFileMeta>> entry :
indexMetas.entrySet()) {
+ String indexType = entry.getKey();
+ List<IndexFileMeta> metas = entry.getValue();
+ GlobalIndexerFactory globalIndexerFactory =
+ GlobalIndexerFactoryUtils.load(indexType);
+ GlobalIndexer globalIndexer =
globalIndexerFactory.create(fieldType, options);
+ List<GlobalIndexIOMeta> globalMetas =
+
metas.stream().map(this::toGlobalMeta).collect(Collectors.toList());
+ readers.add(globalIndexer.createReader(indexFileReadWrite,
globalMetas));
+ }
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to create global index reader",
e);
}
- return globalIndexEvaluator;
+
+ return readers;
+ }
+
+ private GlobalIndexIOMeta toGlobalMeta(IndexFileMeta meta) {
+ GlobalIndexMeta globalIndex = meta.globalIndexMeta();
+ checkNotNull(globalIndex);
+ return new GlobalIndexIOMeta(
+ meta.fileName(),
+ meta.fileSize(),
+ new Range(globalIndex.rowRangeStart(),
globalIndex.rowRangeEnd()),
+ globalIndex.indexMeta());
}
@Override
public void close() throws IOException {
- if (globalIndexEvaluator != null) {
- globalIndexEvaluator.close();
- }
+ globalIndexEvaluator.close();
}
}