This is an automated email from the ASF dual-hosted git repository.
junhao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new aef59c80c6 [core][spark] Move btree builder from spark to core (#7156)
aef59c80c6 is described below
commit aef59c80c6a986b2dc4ad785a73499c38f49445d
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Jan 30 15:48:47 2026 +0800
[core][spark] Move btree builder from spark to core (#7156)
---
.../main/java/org/apache/paimon/utils/Range.java | 2 +
.../globalindex/GlobalIndexBuilderUtils.java | 6 +-
.../globalindex/RowIdIndexFieldsExtractor.java | 2 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 247 +++++++++++++++++++++
.../paimon/utils/MutableObjectIteratorAdapter.java | 92 ++++++++
...leTest.java => BitmapGlobalIndexTableTest.java} | 171 +-------------
.../paimon/table/BtreeGlobalIndexTableTest.java | 196 ++++++++++++++++
.../org/apache/paimon/table/TableTestBase.java | 3 +
.../paimon/flink/lookup/LookupTableTest.java | 3 -
.../globalindex/DefaultGlobalIndexBuilder.java | 4 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 154 -------------
.../globalindex/btree/BTreeIndexTopoBuilder.java | 44 ++--
12 files changed, 569 insertions(+), 355 deletions(-)
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
index 461a68fbab..4457a00254 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Range.java
@@ -29,6 +29,8 @@ import java.util.Objects;
/** Range represents from (inclusive) and to (inclusive). */
public class Range implements Serializable {
+ private static final long serialVersionUID = 1L;
+
public final long from;
public final long to;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
similarity index 92%
rename from
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
rename to
paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index cd26dbfaf4..b53a0641ab 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/GlobalIndexBuilderUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -16,13 +16,9 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.globalindex;
+package org.apache.paimon.globalindex;
import org.apache.paimon.fs.Path;
-import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
-import org.apache.paimon.globalindex.GlobalIndexer;
-import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.index.IndexPathFactory;
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
similarity index 98%
rename from
paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
rename to
paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
index d3ca25a2cb..8eaf176891 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/RowIdIndexFieldsExtractor.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/RowIdIndexFieldsExtractor.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.paimon.spark.globalindex;
+package org.apache.paimon.globalindex;
import org.apache.paimon.codegen.CodeGenUtils;
import org.apache.paimon.codegen.Projection;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
new file mode 100644
index 0000000000..9dc9da7e6e
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
+import org.apache.paimon.globalindex.GlobalIndexWriter;
+import org.apache.paimon.globalindex.ResultEntry;
+import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.reader.RecordReader;
+import org.apache.paimon.sort.BinaryExternalSortBuffer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.SpecialFields;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Range;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Objects;
+
+import static java.util.Collections.singletonList;
+import static
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
+import static
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Builder to build btree global index. */
+public class BTreeGlobalIndexBuilder implements Serializable {
+
+ private static final long serialVersionUID = 1L;
+
+ private static final double FLOATING = 1.2;
+
+ private final FileStoreTable table;
+ private final RowType rowType;
+ private final Options options;
+ private final long recordsPerRange;
+
+ private String indexType;
+ private DataField indexField;
+ private RowType readRowType;
+ private RowIdIndexFieldsExtractor extractor;
+
+ @Nullable private PartitionPredicate partitionPredicate;
+
+ public BTreeGlobalIndexBuilder(Table table) {
+ this.table = (FileStoreTable) table;
+ this.rowType = table.rowType();
+ this.options = this.table.coreOptions().toConfiguration();
+ this.recordsPerRange =
+ (long)
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
+ }
+
+ public BTreeGlobalIndexBuilder withIndexType(String indexType) {
+ this.indexType = indexType;
+ return this;
+ }
+
+ public BTreeGlobalIndexBuilder withIndexField(String indexField) {
+ checkArgument(
+ rowType.containsField(indexField),
+ "Column '%s' does not exist in table '%s'.",
+ indexField,
+ table.fullName());
+ this.indexField = rowType.getField(indexField);
+ this.readRowType =
+ SpecialFields.rowTypeWithRowId(new
RowType(singletonList(this.indexField)));
+ List<String> readColumns = new ArrayList<>(table.partitionKeys());
+ readColumns.addAll(readRowType.getFieldNames());
+ this.extractor =
+ new RowIdIndexFieldsExtractor(
+
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
+ table.partitionKeys(),
+ this.indexField.name());
+ return this;
+ }
+
+ public BTreeGlobalIndexBuilder withPartitionPredicate(PartitionPredicate
partitionPredicate) {
+ this.partitionPredicate = partitionPredicate;
+ return this;
+ }
+
+ public List<DataSplit> scan() {
+ // 1. read the whole dataset of target partitions
+ SnapshotReader snapshotReader = table.newSnapshotReader();
+ if (partitionPredicate != null) {
+ snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
+ }
+
+ return snapshotReader.read().dataSplits();
+ }
+
+ public List<CommitMessage> build(List<DataSplit> splits, IOManager
ioManager)
+ throws IOException {
+ Range rowRange = calcRowRange(splits);
+ if (splits.isEmpty() || rowRange == null) {
+ return Collections.emptyList();
+ }
+
+ CoreOptions options = new CoreOptions(this.options);
+ BinaryExternalSortBuffer buffer =
+ BinaryExternalSortBuffer.create(
+ ioManager,
+ readRowType,
+ new int[] {0},
+ options.writeBufferSize(),
+ options.pageSize(),
+ options.localSortMaxNumFileHandles(),
+ options.spillCompressOptions(),
+ options.writeBufferSpillDiskSize(),
+ options.sequenceFieldSortOrderIsAscending());
+
+ List<Split> splitList = new ArrayList<>(splits);
+ RecordReader<InternalRow> reader =
+
table.newReadBuilder().withReadType(readRowType).newRead().createReader(splitList);
+ try (CloseableIterator<InternalRow> iterator =
reader.toCloseableIterator()) {
+ while (iterator.hasNext()) {
+ InternalRow row = iterator.next();
+ buffer.write(row);
+ }
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ Iterator<InternalRow> iterator =
+ new MutableObjectIteratorAdapter<>(
+ buffer.sortedIterator(), new
BinaryRow(readRowType.getFieldCount()));
+ List<CommitMessage> result = build(rowRange, iterator);
+ buffer.clear();
+
+ return result;
+ }
+
+ public List<CommitMessage> build(Range rowRange, Iterator<InternalRow>
data)
+ throws IOException {
+ long counter = 0;
+ BinaryRow currentPart = null;
+ GlobalIndexParallelWriter currentWriter = null;
+ List<CommitMessage> commitMessages = new ArrayList<>();
+
+ while (data.hasNext()) {
+ InternalRow row = data.next();
+ BinaryRow partRow = extractor.extractPartition(row);
+
+ // the input is sorted by <partition, indexedField>
+ if (currentWriter != null) {
+ if (!Objects.equals(partRow, currentPart) || counter >=
recordsPerRange) {
+ commitMessages.add(flushIndex(rowRange,
currentWriter.finish(), currentPart));
+ currentWriter = null;
+ counter = 0;
+ }
+ }
+
+ // write <value, rowId> pair to index file
+ currentPart = partRow;
+ counter++;
+
+ if (currentWriter == null) {
+ currentWriter = createWriter();
+ }
+
+ // convert the original rowId to local rowId
+ long localRowId = extractor.extractRowId(row) - rowRange.from;
+ currentWriter.write(extractor.extractIndexField(row), localRowId);
+ }
+
+ if (counter > 0) {
+ commitMessages.add(flushIndex(rowRange, currentWriter.finish(),
currentPart));
+ }
+
+ return commitMessages;
+ }
+
+ private GlobalIndexParallelWriter createWriter() throws IOException {
+ GlobalIndexParallelWriter currentWriter;
+ GlobalIndexWriter indexWriter = createIndexWriter(table, indexType,
indexField, options);
+ if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
+ throw new RuntimeException(
+ "Unexpected implementation, the index writer of BTree
should be an instance of GlobalIndexParallelWriter, but found: "
+ + indexWriter.getClass().getName());
+ }
+ currentWriter = (GlobalIndexParallelWriter) indexWriter;
+ return currentWriter;
+ }
+
+ private CommitMessage flushIndex(
+ Range rowRange, List<ResultEntry> resultEntries, BinaryRow
partition)
+ throws IOException {
+ List<IndexFileMeta> indexFileMetas =
+ toIndexFileMetas(table, rowRange, indexField.id(), indexType,
resultEntries);
+ DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
+ return new CommitMessageImpl(
+ partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
+ }
+
+ public static Range calcRowRange(List<DataSplit> dataSplits) {
+ long start = Long.MAX_VALUE;
+ long end = Long.MIN_VALUE;
+ for (DataSplit dataSplit : dataSplits) {
+ for (DataFileMeta file : dataSplit.dataFiles()) {
+ long firstRowId = file.nonNullFirstRowId();
+ start = Math.min(start, firstRowId);
+ end = Math.max(end, firstRowId + file.rowCount());
+ }
+ }
+ return start == Long.MAX_VALUE ? null : new Range(start, end);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
b/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
new file mode 100644
index 0000000000..a4a99e12e7
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/MutableObjectIteratorAdapter.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+
+/**
+ * Adapter class that wraps a {@link MutableObjectIterator} into a standard
Java {@link Iterator}.
+ *
+ * <p>This adapter handles the key differences between {@link
MutableObjectIterator} and standard
+ * {@link Iterator}:
+ *
+ * <ul>
+ * <li>{@link MutableObjectIterator} returns {@code null} to indicate the
end of iteration, while
+ * {@link Iterator} uses {@link #hasNext()} and {@link #next()}
separation.
+ * <li>{@link MutableObjectIterator} can throw {@link IOException} from its
{@code next()} method,
+ * while standard {@link Iterator} does not declare checked exceptions.
This adapter wraps
+ * {@link IOException} into {@link RuntimeException}.
+ * </ul>
+ *
+ * @param <E> The element type of the iterator.
+ */
+public class MutableObjectIteratorAdapter<I extends E, E> implements
Iterator<E> {
+
+ private final MutableObjectIterator<I> delegate;
+ private final I instance;
+ private E nextElement;
+ private boolean hasNext = false;
+ private boolean initialized = false;
+
+ /**
+ * Creates a new adapter wrapping the given {@link MutableObjectIterator}.
+ *
+ * @param delegate The iterator to wrap.
+ */
+ public MutableObjectIteratorAdapter(MutableObjectIterator<I> delegate, I
instance) {
+ this.delegate = delegate;
+ this.instance = instance;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (!initialized) {
+ prefetch();
+ }
+ return hasNext;
+ }
+
+ @Override
+ public E next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+ E result = nextElement;
+ prefetch();
+ return result;
+ }
+
+ /**
+ * Prefetches the next element from the delegate iterator.
+ *
+ * <p>This method reads ahead one element to support the {@link
#hasNext()} check required by
+ * the standard {@link Iterator} interface.
+ */
+ private void prefetch() {
+ try {
+ nextElement = delegate.next(instance);
+ hasNext = (nextElement != null);
+ initialized = true;
+ } catch (IOException e) {
+ throw new RuntimeException("Failed to read next element from
MutableObjectIterator", e);
+ }
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
similarity index 61%
rename from
paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java
rename to
paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
index b375cea8d6..65ec24b9bb 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/GlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BitmapGlobalIndexTableTest.java
@@ -21,11 +21,9 @@ package org.apache.paimon.table;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.globalindex.DataEvolutionBatchScan;
import org.apache.paimon.globalindex.GlobalIndexFileReadWrite;
-import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexResult;
import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
import org.apache.paimon.globalindex.GlobalIndexSingletonWriter;
@@ -35,13 +33,10 @@ import
org.apache.paimon.globalindex.GlobalIndexerFactoryUtils;
import org.apache.paimon.globalindex.ResultEntry;
import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
import org.apache.paimon.globalindex.bitmap.BitmapGlobalIndexerFactory;
-import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
-import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.options.MemorySize;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
@@ -56,36 +51,22 @@ import org.apache.paimon.utils.RoaringNavigableMap64;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.List;
import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/** Test for BTree indexed batch scan. */
-public class GlobalIndexTableTest extends DataEvolutionTestBase {
-
- private static final Logger log =
LoggerFactory.getLogger(GlobalIndexTableTest.class);
+public class BitmapGlobalIndexTableTest extends DataEvolutionTestBase {
@Test
public void testBitmapGlobalIndex() throws Exception {
- innerTestGlobalIndex(BitmapGlobalIndexerFactory.IDENTIFIER);
- }
-
- @Test
- public void testBTreeGlobalIndex() throws Exception {
- innerTestGlobalIndex(BTreeGlobalIndexerFactory.IDENTIFIER);
- }
-
- private void innerTestGlobalIndex(String indexType) throws Exception {
write(100000L);
- createIndex(indexType, "f1");
+ createIndex("f1");
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
@@ -130,17 +111,8 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
@Test
public void testBitmapGlobalIndexWithCoreScan() throws Exception {
-
innerTestGlobalIndexWithCoreScan(BitmapGlobalIndexerFactory.IDENTIFIER);
- }
-
- @Test
- public void testBTreeGlobalIndexWithCoreScan() throws Exception {
- innerTestGlobalIndexWithCoreScan(BTreeGlobalIndexerFactory.IDENTIFIER);
- }
-
- private void innerTestGlobalIndexWithCoreScan(String indexType) throws
Exception {
write(100000L);
- createIndex(indexType, "f1");
+ createIndex("f1");
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
@@ -170,18 +142,9 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
@Test
public void testMultipleBitmapIndices() throws Exception {
- innerTestMultipleIndices(BitmapGlobalIndexerFactory.IDENTIFIER);
- }
-
- @Test
- public void testMultipleBTreeIndices() throws Exception {
- innerTestMultipleIndices(BTreeGlobalIndexerFactory.IDENTIFIER);
- }
-
- private void innerTestMultipleIndices(String indexType) throws Exception {
write(100000L);
- createIndex(indexType, "f1");
- createIndex(indexType, "f2");
+ createIndex("f1");
+ createIndex("f2");
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
Predicate predicate1 =
@@ -217,7 +180,7 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
Assertions.assertThat(result).containsExactly("a200", "a56789");
}
- private void createIndex(String indexType, String fieldName) throws
Exception {
+ private void createIndex(String fieldName) throws Exception {
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
FileIO fileIO = table.fileIO();
RowType rowType =
SpecialFields.rowTypeWithRowTracking(table.rowType().project(fieldName));
@@ -231,25 +194,11 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
table.store().pathFactory().indexFileFactory(BinaryRow.EMPTY_ROW, 0));
DataField indexField = table.rowType().getField(fieldName);
- GlobalIndexerFactory globalIndexerFactory =
GlobalIndexerFactoryUtils.load(indexType);
-
- List<IndexFileMeta> indexFileMetas;
- if (indexType.equals(BTreeGlobalIndexerFactory.IDENTIFIER)) {
- indexFileMetas =
- createBTreeIndex(
- fileIO,
- globalIndexerFactory,
- indexField,
- rowType,
- reader,
- indexFileReadWrite);
- } else if (indexType.equals(BitmapGlobalIndexerFactory.IDENTIFIER)) {
- indexFileMetas =
- createBitmapIndex(
- fileIO, globalIndexerFactory, indexField, reader,
indexFileReadWrite);
- } else {
- throw new Exception("Unsupported scalar index type: " + indexType);
- }
+ GlobalIndexerFactory globalIndexerFactory =
+
GlobalIndexerFactoryUtils.load(BitmapGlobalIndexerFactory.IDENTIFIER);
+
+ List<IndexFileMeta> indexFileMetas =
+ createBitmapIndex(globalIndexerFactory, indexField, reader,
indexFileReadWrite);
DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
@@ -265,7 +214,6 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
}
private List<IndexFileMeta> createBitmapIndex(
- FileIO fileIO,
GlobalIndexerFactory indexerFactory,
DataField indexField,
RecordReader<InternalRow> reader,
@@ -296,103 +244,6 @@ public class GlobalIndexTableTest extends
DataEvolutionTestBase {
null));
}
- private List<IndexFileMeta> createBTreeIndex(
- FileIO fileIO,
- GlobalIndexerFactory indexerFactory,
- DataField indexField,
- RowType rowType,
- RecordReader<InternalRow> reader,
- GlobalIndexFileReadWrite indexFileReadWrite)
- throws Exception {
- Options options = new Options();
- options.set(BTreeIndexOptions.BTREE_INDEX_CACHE_SIZE,
MemorySize.ofMebiBytes(1));
- GlobalIndexer globalIndexer = indexerFactory.create(indexField,
options);
-
- // collect all rows
- List<InternalRow> rows = new ArrayList<>();
- InternalRowSerializer rowSerializer = new
InternalRowSerializer(rowType);
- reader.forEachRemaining(row -> rows.add(rowSerializer.copy(row)));
- // sort by row id
- rows.sort(Comparator.comparing(row -> row.getLong(1)));
- // intentionally split into two chunks, build index for each range
- int midRow = rows.size() / 2;
- Range firstRange = new Range(rows.get(0).getLong(1),
rows.get(midRow).getLong(1));
- Range secondRange =
- new Range(rows.get(midRow + 1).getLong(1),
rows.get(rows.size() - 1).getLong(1));
-
- List<IndexFileMeta> indexFileMetas = new ArrayList<>();
- indexFileMetas.addAll(
- createBTreeIndexForRange(
- fileIO,
- globalIndexer,
- indexField,
- rows.subList(0, midRow + 1),
- indexFileReadWrite,
- firstRange));
- indexFileMetas.addAll(
- createBTreeIndexForRange(
- fileIO,
- globalIndexer,
- indexField,
- rows.subList(midRow + 1, rows.size()),
- indexFileReadWrite,
- secondRange));
-
- return indexFileMetas;
- }
-
- private List<IndexFileMeta> createBTreeIndexForRange(
- FileIO fileIO,
- GlobalIndexer globalIndexer,
- DataField indexField,
- List<InternalRow> rowChunk,
- GlobalIndexFileReadWrite indexFileReadWrite,
- Range range)
- throws Exception {
- // collect all data and sort by index field.
- // this will be done distributedly by Spark/Flink in production.
- List<InternalRow> rows = new ArrayList<>(rowChunk);
- rows.sort(Comparator.comparing(row -> row.getString(0).toString()));
-
- int fileNum = 10;
- int targetFileSize = rows.size() / fileNum;
- int currentOffset = 0;
-
- List<IndexFileMeta> indexFileMetas = new ArrayList<>();
- for (int i = 0; i < fileNum; i++) {
- // write btree file for each data chunk.
- GlobalIndexParallelWriter writer =
- (GlobalIndexParallelWriter)
globalIndexer.createWriter(indexFileReadWrite);
- for (int j = currentOffset;
- j < Math.min(currentOffset + targetFileSize, rows.size());
- j++) {
- InternalRow row = rows.get(j);
- writer.write(row.getString(0), row.getLong(1) - range.from);
- }
- currentOffset += targetFileSize;
-
- List<ResultEntry> entries = writer.finish();
- Assertions.assertThat(entries).hasSize(1);
- ResultEntry entry = entries.get(0);
-
- String fileName = entry.fileName();
- long fileSize = indexFileReadWrite.fileSize(fileName);
- GlobalIndexMeta globalIndexMeta =
- new GlobalIndexMeta(range.from, range.to, indexField.id(),
null, entry.meta());
-
- indexFileMetas.add(
- new IndexFileMeta(
- BTreeGlobalIndexerFactory.IDENTIFIER,
- fileName,
- fileSize,
- entry.rowCount(),
- globalIndexMeta,
- null));
- }
-
- return indexFileMetas;
- }
-
private RoaringNavigableMap64 globalIndexScan(FileStoreTable table,
Predicate predicate)
throws Exception {
GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
new file mode 100644
index 0000000000..b4f30f82e9
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.globalindex.DataEvolutionBatchScan;
+import org.apache.paimon.globalindex.GlobalIndexResult;
+import org.apache.paimon.globalindex.GlobalIndexScanBuilder;
+import org.apache.paimon.globalindex.RowRangeGlobalIndexScanner;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexerFactory;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RoaringNavigableMap64;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/** Test for BTree indexed batch scan. */
+public class BtreeGlobalIndexTableTest extends DataEvolutionTestBase {
+
+ @Test
+ public void testBTreeGlobalIndex() throws Exception {
+ write(100000L);
+ createIndex("f1");
+
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+
+ Predicate predicate =
+ new PredicateBuilder(table.rowType()).equal(1,
BinaryString.fromString("a100"));
+
+ RoaringNavigableMap64 rowIds = globalIndexScan(table, predicate);
+ assertNotNull(rowIds);
+ Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(1);
+ Assertions.assertThat(rowIds.toRangeList()).containsExactly(new
Range(100L, 100L));
+
+ Predicate predicate2 =
+ new PredicateBuilder(table.rowType())
+ .in(
+ 1,
+ Arrays.asList(
+ BinaryString.fromString("a200"),
+ BinaryString.fromString("a300"),
+ BinaryString.fromString("a400")));
+
+ rowIds = globalIndexScan(table, predicate2);
+ assertNotNull(rowIds);
+ Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(3);
+ Assertions.assertThat(rowIds.toRangeList())
+ .containsExactlyInAnyOrder(
+ new Range(200L, 200L), new Range(300L, 300L), new
Range(400L, 400L));
+
+ DataEvolutionBatchScan scan = (DataEvolutionBatchScan) table.newScan();
+ RoaringNavigableMap64 finalRowIds = rowIds;
+ scan.withGlobalIndexResult(GlobalIndexResult.create(() ->
finalRowIds));
+
+ List<String> readF1 = new ArrayList<>();
+ table.newRead()
+ .createReader(scan.plan())
+ .forEachRemaining(
+ row -> {
+ readF1.add(row.getString(1).toString());
+ });
+
+ Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
+ }
+
+ @Test
+ public void testBTreeGlobalIndexWithCoreScan() throws Exception {
+ write(100000L);
+ createIndex("f1");
+
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+
+ Predicate predicate =
+ new PredicateBuilder(table.rowType())
+ .in(
+ 1,
+ Arrays.asList(
+ BinaryString.fromString("a200"),
+ BinaryString.fromString("a300"),
+ BinaryString.fromString("a400"),
+ BinaryString.fromString("a56789")));
+
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+
+ List<String> readF1 = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ readF1.add(row.getString(1).toString());
+ });
+
+ Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400",
"a56789");
+ }
+
+ @Test
+ public void testMultipleBTreeIndices() throws Exception {
+ write(100000L);
+ createIndex("f1");
+ createIndex("f2");
+
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+ Predicate predicate1 =
+ new PredicateBuilder(table.rowType())
+ .in(
+ 1,
+ Arrays.asList(
+ BinaryString.fromString("a200"),
+ BinaryString.fromString("a300"),
+ BinaryString.fromString("a56789")));
+
+ Predicate predicate2 =
+ new PredicateBuilder(table.rowType())
+ .in(
+ 2,
+ Arrays.asList(
+ BinaryString.fromString("b200"),
+ BinaryString.fromString("b400"),
+ BinaryString.fromString("b56789")));
+
+ Predicate predicate = PredicateBuilder.and(predicate1, predicate2);
+ ReadBuilder readBuilder = table.newReadBuilder().withFilter(predicate);
+
+ List<String> result = new ArrayList<>();
+ readBuilder
+ .newRead()
+ .createReader(readBuilder.newScan().plan())
+ .forEachRemaining(
+ row -> {
+ result.add(row.getString(1).toString());
+ });
+
+ Assertions.assertThat(result).containsExactly("a200", "a56789");
+ }
+
+ private void createIndex(String fieldName) throws Exception {
+ FileStoreTable table = (FileStoreTable) catalog.getTable(identifier());
+ BTreeGlobalIndexBuilder builder =
+ new BTreeGlobalIndexBuilder(table)
+ .withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
+ .withIndexField(fieldName);
+ List<CommitMessage> commitMessages = builder.build(builder.scan(),
ioManager);
+ try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
+ commit.commit(commitMessages);
+ }
+ }
+
+ private RoaringNavigableMap64 globalIndexScan(FileStoreTable table,
Predicate predicate)
+ throws Exception {
+ GlobalIndexScanBuilder indexScanBuilder =
table.store().newGlobalIndexScanBuilder();
+ List<Range> ranges = indexScanBuilder.shardList();
+ GlobalIndexResult globalFileIndexResult =
GlobalIndexResult.createEmpty();
+ for (Range range : ranges) {
+ try (RowRangeGlobalIndexScanner scanner =
+ indexScanBuilder.withRowRange(range).build()) {
+ Optional<GlobalIndexResult> globalIndexResult =
scanner.scan(predicate, null);
+ if (!globalIndexResult.isPresent()) {
+ throw new RuntimeException("Can't find index result by
scan");
+ }
+ globalFileIndexResult =
globalFileIndexResult.or(globalIndexResult.get());
+ }
+ }
+
+ return globalFileIndexResult.results();
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
index a533017779..1a423aeb1e 100644
--- a/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/table/TableTestBase.java
@@ -28,6 +28,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.fs.Path;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.reader.RecordReader;
@@ -75,6 +76,7 @@ public abstract class TableTestBase {
protected Catalog catalog;
protected String database;
@TempDir public java.nio.file.Path tempPath;
+ protected IOManager ioManager;
@BeforeEach
public void beforeEach() throws Catalog.DatabaseAlreadyExistException {
@@ -82,6 +84,7 @@ public abstract class TableTestBase {
warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempPath.toString());
catalog =
CatalogFactory.createCatalog(CatalogContext.create(warehouse));
catalog.createDatabase(database, true);
+ this.ioManager = new IOManagerImpl(tempPath.toString());
}
@AfterEach
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
index ed172b5e58..b077e9dcdb 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/lookup/LookupTableTest.java
@@ -25,7 +25,6 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.JoinedRow;
import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.lookup.FullCacheLookupTable.TableBulkLoader;
import org.apache.paimon.fs.FileIO;
@@ -89,7 +88,6 @@ public class LookupTableTest extends TableTestBase {
@TempDir Path tempDir;
private RowType rowType;
- private IOManager ioManager;
private FullCacheLookupTable table;
public LookupTableTest(boolean inMemory) {
@@ -105,7 +103,6 @@ public class LookupTableTest extends TableTestBase {
@BeforeEach
public void before() throws IOException {
this.rowType = RowType.of(new IntType(), new IntType(), new IntType());
- this.ioManager = new IOManagerImpl(tempDir.toString());
}
@AfterEach
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index e0a6c0e353..fc8c6b83cb 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -39,8 +39,8 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.List;
-import static
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
-import static
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
+import static
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
+import static
org.apache.paimon.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
/** Default global index builder. */
public class DefaultGlobalIndexBuilder implements Serializable {
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
deleted file mode 100644
index 0f520c5884..0000000000
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java
+++ /dev/null
@@ -1,154 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.spark.globalindex.btree;
-
-import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.GlobalIndexWriter;
-import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
-import org.apache.paimon.index.IndexFileMeta;
-import org.apache.paimon.io.CompactIncrement;
-import org.apache.paimon.io.DataIncrement;
-import org.apache.paimon.options.Options;
-import org.apache.paimon.spark.globalindex.RowIdIndexFieldsExtractor;
-import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.SpecialFields;
-import org.apache.paimon.table.sink.CommitMessage;
-import org.apache.paimon.table.sink.CommitMessageImpl;
-import org.apache.paimon.types.DataField;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
-import org.apache.paimon.utils.Range;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Objects;
-
-import static
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.createIndexWriter;
-import static
org.apache.paimon.spark.globalindex.GlobalIndexBuilderUtils.toIndexFileMetas;
-
-/**
- * A global index builder implementation for BTree Index. The caller of {@link
- * #build(CloseableIterator) build} must ensure the input data is sorted by
partitions and indexed
- * field.
- */
-public class BTreeGlobalIndexBuilder implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private static final double FLOATING = 1.2;
-
- private final FileStoreTable table;
- private final DataField indexField;
- private final String indexType;
- private final Range rowRange;
- private final Options options;
- private final RowIdIndexFieldsExtractor extractor;
- private final long recordsPerRange;
-
- public BTreeGlobalIndexBuilder(
- FileStoreTable table,
- RowType readType,
- DataField indexField,
- String indexType,
- Range rowRange,
- Options options) {
- this.table = table;
- this.indexField = indexField;
- this.indexType = indexType;
- this.rowRange = rowRange;
- this.options = options;
- List<String> readColumns = new ArrayList<>(table.partitionKeys());
- readColumns.addAll(readType.getFieldNames());
- this.extractor =
- new RowIdIndexFieldsExtractor(
-
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns),
- table.partitionKeys(),
- indexField.name());
-
- // Each partition boundary is derived from sampling, so we introduce a
slack factor
- // to avoid generating too many small files due to sampling variance.
- this.recordsPerRange =
- (long)
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
- }
-
- public List<CommitMessage> build(CloseableIterator<InternalRow> data)
throws IOException {
- long counter = 0;
- BinaryRow currentPart = null;
- GlobalIndexParallelWriter currentWriter = null;
- List<CommitMessage> commitMessages = new ArrayList<>();
-
- while (data.hasNext()) {
- InternalRow row = data.next();
- BinaryRow partRow = extractor.extractPartition(row);
-
- // the input is sorted by <partition, indexedField>
- if (currentWriter != null) {
- if (!Objects.equals(partRow, currentPart) || counter >=
recordsPerRange) {
- commitMessages.add(flushIndex(currentWriter.finish(),
currentPart));
- currentWriter = null;
- counter = 0;
- }
- }
-
- // write <value, rowId> pair to index file
- currentPart = partRow;
- counter++;
-
- if (currentWriter == null) {
- currentWriter = createWriter();
- }
-
- // convert the original rowId to local rowId
- long localRowId = extractor.extractRowId(row) - rowRange.from;
- currentWriter.write(extractor.extractIndexField(row), localRowId);
- }
-
- if (counter > 0) {
- commitMessages.add(flushIndex(currentWriter.finish(),
currentPart));
- }
-
- return commitMessages;
- }
-
- private GlobalIndexParallelWriter createWriter() throws IOException {
- GlobalIndexParallelWriter currentWriter;
- GlobalIndexWriter indexWriter = createIndexWriter(table, indexType,
indexField, options);
- if (!(indexWriter instanceof GlobalIndexParallelWriter)) {
- throw new RuntimeException(
- "Unexpected implementation, the index writer of BTree
should be an instance of GlobalIndexParallelWriter, but found: "
- + indexWriter.getClass().getName());
- }
- currentWriter = (GlobalIndexParallelWriter) indexWriter;
- return currentWriter;
- }
-
- private CommitMessage flushIndex(List<ResultEntry> resultEntries,
BinaryRow partition)
- throws IOException {
- List<IndexFileMeta> indexFileMetas =
- toIndexFileMetas(table, rowRange, indexField.id(), indexType,
resultEntries);
- DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
- return new CommitMessageImpl(
- partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
- }
-}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index 68e77cbf14..0b626da8a3 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -19,8 +19,8 @@
package org.apache.paimon.spark.globalindex.btree;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
-import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.PartitionPredicate;
import org.apache.paimon.spark.SparkRow;
@@ -31,10 +31,8 @@ import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.source.DataSplit;
-import org.apache.paimon.table.source.snapshot.SnapshotReader;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.InstantiationUtil;
import org.apache.paimon.utils.Range;
@@ -54,6 +52,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+
/** The {@link GlobalIndexTopologyBuilder} for BTree index. */
public class BTreeIndexTopoBuilder implements GlobalIndexTopologyBuilder {
@@ -73,16 +73,18 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
DataField indexField,
Options options)
throws IOException {
-
// 1. read the whole dataset of target partitions
- SnapshotReader snapshotReader = table.newSnapshotReader();
+ BTreeGlobalIndexBuilder indexBuilder =
+ new BTreeGlobalIndexBuilder(table)
+ .withIndexType(indexType)
+ .withIndexField(indexField.name());
if (partitionPredicate != null) {
- snapshotReader =
snapshotReader.withPartitionFilter(partitionPredicate);
+ indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}
- List<DataSplit> dataSplits = snapshotReader.read().dataSplits();
- Range range = calcRowRange(dataSplits);
- if (dataSplits.isEmpty() || range == null) {
+ List<DataSplit> splits = indexBuilder.scan();
+ Range range = calcRowRange(splits);
+ if (splits.isEmpty() || range == null) {
return Collections.emptyList();
}
@@ -95,7 +97,7 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
PaimonUtils.createDataset(
spark,
ScanPlanHelper$.MODULE$.createNewScanPlan(
- dataSplits.toArray(new DataSplit[0]),
relation));
+ splits.toArray(new DataSplit[0]), relation));
Dataset<Row> selected =
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
@@ -124,10 +126,7 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
.sortWithinPartitions(sortFields);
// 3. write index for each partition & range
- final byte[] builderBytes =
- InstantiationUtil.serializeObject(
- new BTreeGlobalIndexBuilder(
- table, readType, indexField, indexType, range,
options));
+ final byte[] builderBytes =
InstantiationUtil.serializeObject(indexBuilder);
final RowType rowType =
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
JavaRDD<byte[]> written =
@@ -145,9 +144,7 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
BTreeGlobalIndexBuilder.class
.getClassLoader());
List<CommitMessage> commitMessages
=
- builder.build(
-
CloseableIterator.adapterForIterator(
- iter));
+ builder.build(range, iter);
List<byte[]> messageBytes = new
ArrayList<>();
for (CommitMessage commitMessage :
commitMessages) {
@@ -171,17 +168,4 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
return result;
}
-
- private Range calcRowRange(List<DataSplit> dataSplits) {
- long start = Long.MAX_VALUE;
- long end = Long.MIN_VALUE;
- for (DataSplit dataSplit : dataSplits) {
- for (DataFileMeta file : dataSplit.dataFiles()) {
- long firstRowId = file.nonNullFirstRowId();
- start = Math.min(start, firstRowId);
- end = Math.max(end, firstRowId + file.rowCount());
- }
- }
- return start == Long.MAX_VALUE ? null : new Range(start, end);
- }
}