This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 18a770106f [flink] [spark] Fix btree procedure GlobalIndexMeta row
range not accurate (#7354)
18a770106f is described below
commit 18a770106f9360e1755a5b17a066d9a4cc3c2a49
Author: YeJunHao <[email protected]>
AuthorDate: Tue Mar 10 18:08:43 2026 +0800
[flink] [spark] Fix btree procedure GlobalIndexMeta row range not accurate
(#7354)
---
.../globalindex/GlobalIndexBuilderUtils.java | 11 +-
.../globalindex/btree/BTreeGlobalIndexBuilder.java | 228 ++++++++++++++++-----
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 9 +-
.../btree/BTreeGlobalIndexBuilderSplitTest.java | 91 ++++++++
.../btree/BTreeGlobalIndexBuilderTest.java | 7 +-
.../paimon/table/BtreeGlobalIndexTableTest.java | 23 ++-
.../paimon/flink/btree/BTreeIndexTopoBuilder.java | 198 +++++++++++++-----
.../paimon/flink/BTreeGlobalIndexITCase.java | 74 +++++++
.../globalindex/DefaultGlobalIndexBuilder.java | 9 +-
.../globalindex/btree/BTreeIndexTopoBuilder.java | 137 ++++++++-----
.../procedure/CreateGlobalIndexProcedure.java | 6 +-
.../procedure/CreateGlobalIndexProcedureTest.scala | 11 +-
12 files changed, 617 insertions(+), 187 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
index b53a0641ab..085423efa8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/GlobalIndexBuilderUtils.java
@@ -18,6 +18,8 @@
package org.apache.paimon.globalindex;
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.index.GlobalIndexMeta;
import org.apache.paimon.index.IndexFileMeta;
@@ -35,7 +37,9 @@ import java.util.List;
public class GlobalIndexBuilderUtils {
public static List<IndexFileMeta> toIndexFileMetas(
- FileStoreTable table,
+ FileIO fileIO,
+ IndexPathFactory indexPathFactory,
+ CoreOptions options,
Range range,
int indexFieldId,
String indexType,
@@ -44,12 +48,11 @@ public class GlobalIndexBuilderUtils {
List<IndexFileMeta> results = new ArrayList<>();
for (ResultEntry entry : entries) {
String fileName = entry.fileName();
- GlobalIndexFileReadWrite readWrite =
createGlobalIndexFileReadWrite(table);
- long fileSize = readWrite.fileSize(fileName);
+ long fileSize =
fileIO.getFileSize(indexPathFactory.toPath(fileName));
GlobalIndexMeta globalIndexMeta =
new GlobalIndexMeta(range.from, range.to, indexFieldId,
null, entry.meta());
- Path externalPathDir =
table.coreOptions().globalIndexExternalPath();
+ Path externalPathDir = options.globalIndexExternalPath();
String externalPathString = null;
if (externalPathDir != null) {
Path externalPath = new Path(externalPathDir, fileName);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
index 08b31d06b4..4c051741b2 100644
---
a/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
+++
b/paimon-core/src/main/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilder.java
@@ -19,13 +19,14 @@
package org.apache.paimon.globalindex.btree;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
import org.apache.paimon.globalindex.GlobalIndexWriter;
import org.apache.paimon.globalindex.ResultEntry;
-import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.io.CompactIncrement;
import org.apache.paimon.io.DataFileMeta;
@@ -46,8 +47,10 @@ import org.apache.paimon.types.DataField;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CloseableIterator;
import org.apache.paimon.utils.MutableObjectIteratorAdapter;
+import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.Range;
+import org.apache.paimon.utils.RangeHelper;
import javax.annotation.Nullable;
@@ -55,9 +58,13 @@ import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedHashMap;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.function.Supplier;
import java.util.stream.IntStream;
import static java.util.Collections.singletonList;
@@ -82,13 +89,12 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
// readRowType is composed by partition fields, indexed field and _ROW_ID
field
private RowType readRowType;
- private RowIdIndexFieldsExtractor extractor;
@Nullable private PartitionPredicate partitionPredicate;
public BTreeGlobalIndexBuilder(Table table) {
this.table = (FileStoreTable) table;
- this.rowType = table.rowType();
+ this.rowType = this.table.rowType();
this.options = this.table.coreOptions().toConfiguration();
this.recordsPerRange =
(long)
(options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) * FLOATING);
@@ -110,14 +116,11 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
indexField,
table.fullName());
this.indexField = rowType.getField(indexField);
- List<String> readColumns = new ArrayList<>(table.partitionKeys());
- readColumns.addAll(
- SpecialFields.rowTypeWithRowId(new
RowType(singletonList(this.indexField)))
- .getFieldNames());
+ List<String> readColumns = new ArrayList<>();
+ readColumns.add(this.indexField.name());
+ readColumns.add(SpecialFields.ROW_ID.name());
+
this.readRowType =
SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns);
- this.extractor =
- new RowIdIndexFieldsExtractor(
- this.readRowType, table.partitionKeys(),
this.indexField.name());
return this;
}
@@ -136,12 +139,10 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
return snapshotReader.read().dataSplits();
}
- public List<CommitMessage> build(List<DataSplit> splits, IOManager
ioManager)
- throws IOException {
- Range rowRange = calcRowRange(splits);
- if (splits.isEmpty() || rowRange == null) {
- return Collections.emptyList();
- }
+ @VisibleForTesting
+ public List<CommitMessage> build(DataSplit split, IOManager ioManager)
throws IOException {
+ BinaryRow partition = split.partition();
+ Range rowRange = calcRowRange(split);
CoreOptions options = new CoreOptions(this.options);
BinaryExternalSortBuffer buffer =
@@ -157,7 +158,7 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
options.writeBufferSpillDiskSize(),
options.sequenceFieldSortOrderIsAscending());
- List<Split> splitList = new ArrayList<>(splits);
+ List<Split> splitList = Collections.singletonList(split);
RecordReader<InternalRow> reader =
table.newReadBuilder().withReadType(readRowType).newRead().createReader(splitList);
try (CloseableIterator<InternalRow> iterator =
reader.toCloseableIterator()) {
@@ -172,7 +173,7 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
Iterator<InternalRow> iterator =
new MutableObjectIteratorAdapter<>(
buffer.sortedIterator(), new
BinaryRow(readRowType.getFieldCount()));
- List<CommitMessage> result = build(rowRange, iterator);
+ List<CommitMessage> result = buildForSinglePartition(rowRange,
partition, iterator);
buffer.clear();
return result;
@@ -182,47 +183,34 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
return recordsPerRange;
}
- public RowIdIndexFieldsExtractor extractor() {
- return extractor;
- }
-
- public List<CommitMessage> build(Range rowRange, Iterator<InternalRow>
data)
- throws IOException {
+ public List<CommitMessage> buildForSinglePartition(
+ Range rowRange, BinaryRow partition, Iterator<InternalRow> data)
throws IOException {
long counter = 0;
- BinaryRow currentPart = null;
GlobalIndexParallelWriter currentWriter = null;
List<CommitMessage> commitMessages = new ArrayList<>();
+ FieldGetter indexFieldGetter =
InternalRow.createFieldGetter(indexField.type(), 0);
while (data.hasNext()) {
InternalRow row = data.next();
- BinaryRow partRow = extractor.extractPartition(row);
-
- // the input is sorted by <partition, indexedField>
- if (currentWriter != null) {
- if (!Objects.equals(partRow, currentPart) || counter >=
recordsPerRange) {
- commitMessages.add(flushIndex(rowRange,
currentWriter.finish(), currentPart));
- currentWriter = null;
- counter = 0;
- }
+
+ if (currentWriter != null && counter >= recordsPerRange) {
+ commitMessages.add(flushIndex(rowRange,
currentWriter.finish(), partition));
+ currentWriter = null;
+ counter = 0;
}
- // write <value, rowId> pair to index file
- currentPart = partRow;
counter++;
-
if (currentWriter == null) {
currentWriter = createWriter();
}
- // convert the original rowId to local rowId
- long localRowId = extractor.extractRowId(row) - rowRange.from;
- currentWriter.write(extractor.extractIndexField(row), localRowId);
+ long localRowId = row.getLong(1) - rowRange.from;
+ currentWriter.write(indexFieldGetter.getFieldOrNull(row),
localRowId);
}
if (counter > 0) {
- commitMessages.add(flushIndex(rowRange, currentWriter.finish(),
currentPart));
+ commitMessages.add(flushIndex(rowRange, currentWriter.finish(),
partition));
}
-
return commitMessages;
}
@@ -242,22 +230,160 @@ public class BTreeGlobalIndexBuilder implements
Serializable {
Range rowRange, List<ResultEntry> resultEntries, BinaryRow
partition)
throws IOException {
List<IndexFileMeta> indexFileMetas =
- toIndexFileMetas(table, rowRange, indexField.id(), indexType,
resultEntries);
+ toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ indexType,
+ resultEntries);
DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
}
- public static Range calcRowRange(List<DataSplit> dataSplits) {
- long start = Long.MAX_VALUE;
- long end = Long.MIN_VALUE;
+ public static Range calcRowRange(DataSplit dataSplit) {
+ List<Range> ranges = calcRowRanges(singletonList(dataSplit));
+ if (ranges.isEmpty()) {
+ return null;
+ }
+ return new Range(ranges.get(0).from, ranges.get(ranges.size() - 1).to);
+ }
+
+ public static List<Range> calcRowRanges(List<DataSplit> dataSplits) {
+ List<Range> ranges = new ArrayList<>();
for (DataSplit dataSplit : dataSplits) {
for (DataFileMeta file : dataSplit.dataFiles()) {
- Range range = file.nonNullRowIdRange();
- start = Math.min(start, range.from);
- end = Math.max(end, range.to);
+ ranges.add(file.nonNullRowIdRange());
}
}
- return start == Long.MAX_VALUE ? null : new Range(start, end);
+ return Range.sortAndMergeOverlap(ranges, true);
+ }
+
+ public static List<DataSplit> splitByContiguousRowRange(List<DataSplit>
splits) {
+ List<DataSplit> result = new ArrayList<>();
+ for (DataSplit split : splits) {
+ result.addAll(splitByContiguousRowRange(split));
+ }
+ return result;
+ }
+
+ public static Map<BinaryRow, Map<Range, List<DataSplit>>>
groupSplitsByRange(
+ List<DataSplit> splits) {
+ Map<BinaryRow, List<Pair<Range, DataSplit>>> partitionSplitRanges =
new HashMap<>();
+ for (DataSplit split : splits) {
+ Range splitRange = calcRowRange(split);
+ if (splitRange == null) {
+ continue;
+ }
+ BinaryRow partition = split.partition();
+ partitionSplitRanges
+ .computeIfAbsent(partition, p -> new ArrayList<>())
+ .add(Pair.of(splitRange, split));
+ }
+
+ Map<BinaryRow, Map<Range, List<DataSplit>>> result = new HashMap<>();
+ for (Map.Entry<BinaryRow, List<Pair<Range, DataSplit>>> partitionEntry
:
+ partitionSplitRanges.entrySet()) {
+ List<Pair<Range, DataSplit>> splitRanges =
partitionEntry.getValue();
+ splitRanges.sort(
+ Comparator.comparingLong((Pair<Range, DataSplit> e) ->
e.getKey().from)
+ .thenComparingLong(e -> e.getKey().to));
+
+ Map<Range, List<DataSplit>> partitionRanges = new
LinkedHashMap<>();
+ Range current = null;
+ List<DataSplit> currentSplits = new ArrayList<>();
+ for (Map.Entry<Range, DataSplit> entry : splitRanges) {
+ Range splitRange = entry.getKey();
+ if (current == null) {
+ current = splitRange;
+ currentSplits.add(entry.getValue());
+ continue;
+ }
+ Range merged = Range.union(current, splitRange);
+ if (merged != null) {
+ current = merged;
+ currentSplits.add(entry.getValue());
+ } else {
+ partitionRanges.put(current, currentSplits);
+ current = splitRange;
+ currentSplits = new ArrayList<>();
+ currentSplits.add(entry.getValue());
+ }
+ }
+ if (current != null) {
+ partitionRanges.put(current, currentSplits);
+ }
+ result.put(partitionEntry.getKey(), partitionRanges);
+ }
+
+ return result;
+ }
+
+ private static List<DataSplit> splitByContiguousRowRange(DataSplit split) {
+ List<DataFileMeta> input = split.dataFiles();
+ RangeHelper<DataFileMeta> rangeHelper = new
RangeHelper<>(DataFileMeta::nonNullRowIdRange);
+ List<List<DataFileMeta>> ranges =
rangeHelper.mergeOverlappingRanges(input);
+
+ Supplier<DataSplit.Builder> builderSupplier =
+ () ->
+ DataSplit.builder()
+ .withSnapshot(split.snapshotId())
+ .withPartition(split.partition())
+ .withBucket(split.bucket())
+ .withBucketPath(split.bucketPath())
+ .withTotalBuckets(split.totalBuckets())
+ .isStreaming(split.isStreaming())
+ .rawConvertible(split.rawConvertible());
+ return packByContiguousRanges(builderSupplier, ranges);
+ }
+
+ private static List<DataSplit> packByContiguousRanges(
+ Supplier<DataSplit.Builder> builderFactory,
List<List<DataFileMeta>> ranges) {
+ if (ranges.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ List<DataSplit> result = new ArrayList<>();
+ List<DataFileMeta> currentSegment = new ArrayList<>();
+ long currentMaxRowId = Long.MIN_VALUE;
+
+ for (List<DataFileMeta> rangeFiles : ranges) {
+ long minRowId = minRowId(rangeFiles);
+ long maxRowId = maxRowId(rangeFiles);
+ if (currentSegment.isEmpty() || areContiguous(currentMaxRowId,
minRowId)) {
+ currentSegment.addAll(rangeFiles);
+ currentMaxRowId = maxRowId;
+ } else {
+ DataSplit.Builder builder = builderFactory.get();
+ builder.withDataFiles(currentSegment);
+ result.add(builder.build());
+ currentSegment = new ArrayList<>(rangeFiles);
+ currentMaxRowId = maxRowId;
+ }
+ }
+
+ DataSplit.Builder builder = builderFactory.get();
+ builder.withDataFiles(currentSegment);
+ result.add(builder.build());
+ return result;
+ }
+
+ private static long minRowId(List<DataFileMeta> files) {
+ return files.stream()
+ .mapToLong(f -> f.nonNullRowIdRange().from)
+ .min()
+ .orElse(Long.MAX_VALUE);
+ }
+
+ private static long maxRowId(List<DataFileMeta> files) {
+ return files.stream().mapToLong(f ->
f.nonNullRowIdRange().to).max().orElse(Long.MIN_VALUE);
+ }
+
+ private static boolean areContiguous(long previousMaxRowId, long
currentMinRowId) {
+ // Contiguous means no gap between adjacent ranges.
+ // e.g. previous max == current min (as requested) or previous max + 1
== current min.
+ return previousMaxRowId >= currentMinRowId - 1;
}
}
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index c63456a2c2..a9bcfc9bbf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -515,7 +515,8 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder =
new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
- commit.commit(builder.build(builder.scan(),
IOManager.create(warehouse.toString())));
+ commit.commit(
+ builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
}
// assert index
@@ -583,7 +584,8 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder =
new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
- commit.commit(builder.build(builder.scan(),
IOManager.create(warehouse.toString())));
+ commit.commit(
+ builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
}
// assert index
@@ -653,7 +655,8 @@ public class JavaPyE2ETest {
BTreeGlobalIndexBuilder builder =
new
BTreeGlobalIndexBuilder(table).withIndexType("btree").withIndexField("k");
try (BatchTableCommit commit = writeBuilder.newCommit()) {
- commit.commit(builder.build(builder.scan(),
IOManager.create(warehouse.toString())));
+ commit.commit(
+ builder.build(builder.scan().get(0),
IOManager.create(warehouse.toString())));
}
// assert index
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
new file mode 100644
index 0000000000..3ea2809e1c
--- /dev/null
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderSplitTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.globalindex.btree;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.PojoDataFileMeta;
+import org.apache.paimon.stats.SimpleStats;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.utils.Range;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for split regrouping in {@link BTreeGlobalIndexBuilder}. */
+public class BTreeGlobalIndexBuilderSplitTest {
+
+ @Test
+ public void testSplitByContiguousRowRangeFromDataFiles() {
+ DataFileMeta file1 = createDataFileMeta(0L, 100L);
+ DataFileMeta file2 = createDataFileMeta(300L, 100L);
+ DataFileMeta file3 = createDataFileMeta(100L, 100L);
+ DataSplit split =
+ DataSplit.builder()
+ .withSnapshot(1L)
+ .withPartition(BinaryRow.EMPTY_ROW)
+ .withBucket(0)
+ .withBucketPath("bucket-0")
+ .withDataFiles(Arrays.asList(file1, file2, file3))
+ .isStreaming(false)
+ .rawConvertible(false)
+ .build();
+
+ List<DataSplit> rebuilt =
+
BTreeGlobalIndexBuilder.splitByContiguousRowRange(Collections.singletonList(split));
+
+ assertThat(rebuilt).hasSize(2);
+ assertThat(rebuilt.get(0).dataFiles()).containsExactly(file1, file3);
+ assertThat(rebuilt.get(1).dataFiles()).containsExactly(file2);
+ assertThat(BTreeGlobalIndexBuilder.calcRowRange(rebuilt.get(0)))
+ .isEqualTo(new Range(0, 199));
+ assertThat(BTreeGlobalIndexBuilder.calcRowRange(rebuilt.get(1)))
+ .isEqualTo(new Range(300, 399));
+ }
+
+ private static DataFileMeta createDataFileMeta(long firstRowId, long
rowCount) {
+ return new PojoDataFileMeta(
+ "test-file-" + UUID.randomUUID(),
+ 1024L,
+ rowCount,
+ BinaryRow.EMPTY_ROW,
+ BinaryRow.EMPTY_ROW,
+ SimpleStats.EMPTY_STATS,
+ SimpleStats.EMPTY_STATS,
+ 0L,
+ 0L,
+ 0L,
+ 0,
+ Collections.emptyList(),
+ null,
+ null,
+ null,
+ null,
+ null,
+ null,
+ firstRowId,
+ null);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
index d37ad973da..bd748edb53 100644
---
a/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/globalindex/btree/BTreeGlobalIndexBuilderTest.java
@@ -37,6 +37,7 @@ import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Pair;
@@ -107,7 +108,11 @@ public class BTreeGlobalIndexBuilderTest extends
TableTestBase {
builder.withIndexField("f0");
builder.withIndexType("btree");
builder.withPartitionPredicate(partitionPredicate);
- List<CommitMessage> commitMessages = builder.build(builder.scan(),
ioManager);
+ List<DataSplit> dataSplits = builder.scan();
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (DataSplit dataSplit : dataSplits) {
+ commitMessages.addAll(builder.build(dataSplit, ioManager));
+ }
try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
commit.commit(commitMessages);
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
index b4f30f82e9..9b1ace20a0 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/BtreeGlobalIndexTableTest.java
@@ -29,11 +29,11 @@ import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.utils.Range;
import org.apache.paimon.utils.RoaringNavigableMap64;
-import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import java.util.ArrayList;
@@ -41,6 +41,7 @@ import java.util.Arrays;
import java.util.List;
import java.util.Optional;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertNotNull;
/** Test for BTree indexed batch scan. */
@@ -58,8 +59,8 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
RoaringNavigableMap64 rowIds = globalIndexScan(table, predicate);
assertNotNull(rowIds);
- Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(1);
- Assertions.assertThat(rowIds.toRangeList()).containsExactly(new
Range(100L, 100L));
+ assertThat(rowIds.getLongCardinality()).isEqualTo(1);
+ assertThat(rowIds.toRangeList()).containsExactly(new Range(100L,
100L));
Predicate predicate2 =
new PredicateBuilder(table.rowType())
@@ -72,8 +73,8 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
rowIds = globalIndexScan(table, predicate2);
assertNotNull(rowIds);
- Assertions.assertThat(rowIds.getLongCardinality()).isEqualTo(3);
- Assertions.assertThat(rowIds.toRangeList())
+ assertThat(rowIds.getLongCardinality()).isEqualTo(3);
+ assertThat(rowIds.toRangeList())
.containsExactlyInAnyOrder(
new Range(200L, 200L), new Range(300L, 300L), new
Range(400L, 400L));
@@ -89,7 +90,7 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
readF1.add(row.getString(1).toString());
});
- Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400");
+ assertThat(readF1).containsExactly("a200", "a300", "a400");
}
@Test
@@ -120,7 +121,7 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
readF1.add(row.getString(1).toString());
});
- Assertions.assertThat(readF1).containsExactly("a200", "a300", "a400",
"a56789");
+ assertThat(readF1).containsExactly("a200", "a300", "a400", "a56789");
}
@Test
@@ -160,7 +161,7 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
result.add(row.getString(1).toString());
});
- Assertions.assertThat(result).containsExactly("a200", "a56789");
+ assertThat(result).containsExactly("a200", "a56789");
}
private void createIndex(String fieldName) throws Exception {
@@ -169,7 +170,11 @@ public class BtreeGlobalIndexTableTest extends
DataEvolutionTestBase {
new BTreeGlobalIndexBuilder(table)
.withIndexType(BTreeGlobalIndexerFactory.IDENTIFIER)
.withIndexField(fieldName);
- List<CommitMessage> commitMessages = builder.build(builder.scan(),
ioManager);
+ List<DataSplit> dataSplits = builder.scan();
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (DataSplit dataSplit : dataSplits) {
+ commitMessages.addAll(builder.build(dataSplit, ioManager));
+ }
try (BatchTableCommit commit =
table.newBatchWriteBuilder().newCommit()) {
commit.commit(commitMessages);
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
index 7ab2c69bd6..f69b0d704c 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/btree/BTreeIndexTopoBuilder.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink.btree;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.LogicalTypeConversion;
@@ -34,7 +35,6 @@ import org.apache.paimon.flink.sorter.TableSorter;
import org.apache.paimon.flink.utils.BoundedOneInputOperator;
import org.apache.paimon.flink.utils.JavaTypeInfo;
import org.apache.paimon.globalindex.GlobalIndexParallelWriter;
-import org.apache.paimon.globalindex.RowIdIndexFieldsExtractor;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
import org.apache.paimon.options.Options;
@@ -47,6 +47,7 @@ import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.DataSplit;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.Range;
@@ -60,9 +61,11 @@ import
org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import java.util.Objects;
+import java.util.Map;
+import java.util.UUID;
-import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
+import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
/** The {@link BTreeIndexTopoBuilder} for BTree index in Flink. */
public class BTreeIndexTopoBuilder {
@@ -83,45 +86,115 @@ public class BTreeIndexTopoBuilder {
indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}
- List<DataSplit> splits = indexBuilder.scan();
- Range range = calcRowRange(splits);
- if (splits.isEmpty() || range == null) {
+ List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
+ if (splits.isEmpty()) {
+ return;
+ }
+ Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+ groupSplitsByRange(splits);
+ if (partitionRangeSplits.isEmpty()) {
return;
}
- // 2. Select necessary columns (partition keys + index field + ROW_ID)
- List<String> selectedColumns = new ArrayList<>(table.partitionKeys());
+ // 2. Select necessary columns (index field + ROW_ID)
+ List<String> selectedColumns = new ArrayList<>();
selectedColumns.add(indexColumn);
RowType readType =
SpecialFields.rowTypeWithRowId(table.rowType().project(selectedColumns));
+ int indexFieldPos = readType.getFieldIndex(indexColumn);
+ int rowIdPos = readType.getFieldIndex(SpecialFields.ROW_ID.name());
+ DataType indexFieldType = readType.getTypeAt(indexFieldPos);
- // 3. Calculate parallelism and sort
+ // 3. Calculate maximum parallelism bound
long recordsPerRange =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
- int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
int maxParallelism =
userOptions.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
+
+ // 4. Build one topology per contiguous row range
+ CoreOptions coreOptions = table.coreOptions();
+ ReadBuilder readBuilder =
table.newReadBuilder().withReadType(readType);
+ List<String> sortColumns = new ArrayList<>();
+ sortColumns.add(indexColumn);
+ DataStream<Committable> allCommitMessages = null;
+ int partitionFieldSize = table.partitionKeys().size();
+ BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
+ for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+ partitionRangeSplits.entrySet()) {
+ BinaryRow partition = partitionEntry.getKey();
+ for (Map.Entry<Range, List<DataSplit>> entry :
partitionEntry.getValue().entrySet()) {
+ Range range = entry.getKey();
+ List<DataSplit> rangeSplits = entry.getValue();
+ if (rangeSplits.isEmpty()) {
+ continue;
+ }
+
+ DataStream<Committable> commitMessages =
+ executeForPartitionRange(
+ env,
+ range,
+ rangeSplits,
+ readBuilder,
+ indexBuilder,
+ partitionFieldSize,
+
binaryRowSerializer.serializeToBytes(partition),
+ indexFieldPos,
+ rowIdPos,
+ indexFieldType,
+ sortColumns,
+ coreOptions,
+ readType,
+ recordsPerRange,
+ maxParallelism);
+
+ allCommitMessages =
+ allCommitMessages == null
+ ? commitMessages
+ : allCommitMessages.union(commitMessages);
+ }
+ }
+ if (allCommitMessages != null) {
+ commit(table, allCommitMessages);
+ }
+
+ env.execute("Create btree global index for table: " + table.name());
+ }
+
+ private static DataStream<Committable> executeForPartitionRange(
+ StreamExecutionEnvironment env,
+ Range range,
+ List<DataSplit> rangeSplits,
+ ReadBuilder readBuilder,
+ BTreeGlobalIndexBuilder indexBuilder,
+ int partitionFieldSize,
+ byte[] partition,
+ int indexFieldPos,
+ int rowIdPos,
+ DataType indexFieldType,
+ List<String> sortColumns,
+ CoreOptions coreOptions,
+ RowType readType,
+ long recordsPerRange,
+ int maxParallelism) {
+ int parallelism = Math.max((int) (range.count() / recordsPerRange), 1);
parallelism = Math.min(parallelism, maxParallelism);
- // 4. Create source from splits and select columns
DataStream<DataSplit> sourceStream =
- env.fromData(new JavaTypeInfo<>(DataSplit.class),
splits.toArray(new DataSplit[0]))
- .name("Global Index Source")
+ env.fromData(
+ new JavaTypeInfo<>(DataSplit.class),
+ rangeSplits.toArray(new DataSplit[0]))
+ .name("Global Index Source " + " range=" + range)
.setParallelism(1);
- ReadBuilder readBuilder =
table.newReadBuilder().withReadType(readType);
DataStream<RowData> rowDataStream =
sourceStream
.transform(
- "Read Data",
+ "Read Data " + range,
InternalTypeInfo.of(LogicalTypeConversion.toLogicalType(readType)),
new ReadDataOperator(readBuilder))
.setParallelism(parallelism);
- // 5. Sort data using TableSorter style
- // Configure sort info similar to SortCompactAction
- CoreOptions coreOptions = table.coreOptions();
TableSortInfo sortInfo =
new TableSortInfo.Builder()
- .setSortColumns(selectedColumns)
+ .setSortColumns(sortColumns)
.setSortStrategy(CoreOptions.OrderType.ORDER)
.setSinkParallelism(parallelism)
.setLocalSampleSize(parallelism *
coreOptions.getLocalSampleMagnification())
@@ -129,24 +202,23 @@ public class BTreeIndexTopoBuilder {
.setRangeNumber(parallelism * 10)
.build();
- // Use TableSorter for sorting
TableSorter sorter =
TableSorter.getSorter(env, rowDataStream, coreOptions,
readType, sortInfo);
DataStream<RowData> sortedStream = sorter.sort();
- // 6. Build index for each partition
- DataStream<Committable> commitMessages =
- sortedStream
- .transform(
- "write-btree-index",
- new CommittableTypeInfo(),
- new WriteIndexOperator(range, indexBuilder))
- .setParallelism(parallelism);
-
- // 7. Commit all commit messages
- commit(table, commitMessages);
-
- env.execute("Create btree global index for table: " + table.name());
+ return sortedStream
+ .transform(
+ "write-btree-index " + range,
+ new CommittableTypeInfo(),
+ new WriteIndexOperator(
+ range,
+ partitionFieldSize,
+ partition,
+ indexBuilder,
+ indexFieldPos,
+ rowIdPos,
+ indexFieldType))
+ .setParallelism(parallelism);
}
private static void commit(FileStoreTable table, DataStream<Committable>
written) {
@@ -154,7 +226,7 @@ public class BTreeIndexTopoBuilder {
new CommitterOperatorFactory<>(
false,
true,
- "BTreeIndexCommitter",
+ "BTreeIndexCommitter-" + UUID.randomUUID(),
context ->
new StoreCommitter(
table,
table.newCommit(context.commitUser()), context),
@@ -200,59 +272,75 @@ public class BTreeIndexTopoBuilder {
private static class WriteIndexOperator extends
BoundedOneInputOperator<RowData, Committable> {
private final Range rowRange;
+ private final byte[] partition;
+ private final int partitionFieldSize;
private final BTreeGlobalIndexBuilder builder;
+ private final int indexFieldPos;
+ private final int rowIdPos;
+ private final DataType indexFieldType;
private transient long counter;
- private transient BinaryRow currentPart;
private transient GlobalIndexParallelWriter currentWriter;
private transient List<CommitMessage> commitMessages;
-
- public WriteIndexOperator(Range rowRange, BTreeGlobalIndexBuilder
builder) {
+ private transient InternalRow.FieldGetter indexFieldGetter;
+ private transient BinaryRowSerializer binaryRowSerializer;
+
+ public WriteIndexOperator(
+ Range rowRange,
+ int partitionFieldSize,
+ byte[] partition,
+ BTreeGlobalIndexBuilder builder,
+ int indexFieldPos,
+ int rowIdPos,
+ DataType indexFieldType) {
this.rowRange = rowRange;
+ this.partitionFieldSize = partitionFieldSize;
+ this.partition = partition;
this.builder = builder;
+ this.indexFieldPos = indexFieldPos;
+ this.rowIdPos = rowIdPos;
+ this.indexFieldType = indexFieldType;
}
@Override
public void open() throws Exception {
super.open();
commitMessages = new ArrayList<>();
+ indexFieldGetter = InternalRow.createFieldGetter(indexFieldType,
indexFieldPos);
+ this.binaryRowSerializer = new
BinaryRowSerializer(partitionFieldSize);
}
@Override
public void processElement(StreamRecord<RowData> element) throws
IOException {
InternalRow row = new FlinkRowWrapper(element.getValue());
-
- RowIdIndexFieldsExtractor extractor = builder.extractor();
- BinaryRow partRow = extractor.extractPartition(row);
-
- // the input is sorted by <partition, indexedField>
- if (currentWriter != null) {
- if (!Objects.equals(partRow, currentPart) || counter >=
builder.recordsPerRange()) {
- commitMessages.add(
- builder.flushIndex(rowRange,
currentWriter.finish(), currentPart));
- currentWriter = null;
- counter = 0;
- }
+ if (currentWriter != null && counter >= builder.recordsPerRange())
{
+ commitMessages.add(
+ builder.flushIndex(
+ rowRange,
+ currentWriter.finish(),
+
binaryRowSerializer.deserializeFromBytes(partition)));
+ currentWriter = null;
+ counter = 0;
}
- // write <value, rowId> pair to index file
- currentPart = partRow;
counter++;
if (currentWriter == null) {
currentWriter = builder.createWriter();
}
- // convert the original rowId to local rowId
- long localRowId = extractor.extractRowId(row) - rowRange.from;
- currentWriter.write(extractor.extractIndexField(row), localRowId);
+ long localRowId = row.getLong(rowIdPos) - rowRange.from;
+ currentWriter.write(indexFieldGetter.getFieldOrNull(row),
localRowId);
}
@Override
public void endInput() throws IOException {
if (counter > 0) {
commitMessages.add(
- builder.flushIndex(rowRange, currentWriter.finish(),
currentPart));
+ builder.flushIndex(
+ rowRange,
+ currentWriter.finish(),
+
binaryRowSerializer.deserializeFromBytes(partition)));
}
for (CommitMessage message : commitMessages) {
output.collect(
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
index 51cd79f378..d6368da23f 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BTreeGlobalIndexITCase.java
@@ -27,6 +27,7 @@ import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
import java.util.List;
+import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -66,4 +67,77 @@ public class BTreeGlobalIndexITCase extends
CatalogITCaseBase {
// assert select with filter
assertThat(sql("SELECT * FROM T WHERE id =
100")).containsOnly(Row.of(100, "name_100"));
}
+
+ @Test
+ public void testBTreeIndexWithMultiPartition() throws
Catalog.TableNotExistException {
+ sql(
+ "CREATE TABLE T_MP (pt INT, id INT, name STRING) PARTITIONED
BY (pt) WITH ("
+ + "'global-index.enabled' = 'true', "
+ + "'row-tracking.enabled' = 'true', "
+ + "'data-evolution.enabled' = 'true'"
+ + ")");
+
+ // write partition 0: 100k rows
+ insertPartitionRows("T_MP", 0, 0, 500, "p0_a_");
+ insertPartitionRows("T_MP", 0, 500, 500, "p0_a_");
+ // write partition 1: 100k rows
+ insertPartitionRows("T_MP", 1, 1_000, 1_000, "p1_");
+ // write partition 0 again: 100k rows
+ insertPartitionRows("T_MP", 0, 2_000, 1_000, "p0_b_");
+
+ buildBTreeIndexForTable("T_MP", "id");
+
+ FileStoreTable table = paimonTable("T_MP");
+ List<IndexManifestEntry> btreeEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(e -> "btree".equals(e.indexFile().indexType()))
+ .collect(Collectors.toList());
+
+ long totalRowCount =
+ btreeEntries.stream()
+ .map(IndexManifestEntry::indexFile)
+ .mapToLong(IndexFileMeta::rowCount)
+ .sum();
+ Map<Object, Long> partitionRowCounts =
+ btreeEntries.stream()
+ .collect(
+ Collectors.groupingBy(
+ IndexManifestEntry::partition,
+ Collectors.summingLong(e ->
e.indexFile().rowCount())));
+
+ assertThat(partitionRowCounts).hasSize(2);
+
assertThat(partitionRowCounts.values()).containsExactlyInAnyOrder(1_000L,
2_000L);
+ assertThat(totalRowCount).isEqualTo(3_000L);
+
+ assertThat(sql("SELECT * FROM T_MP WHERE id = 999"))
+ .containsOnly(Row.of(0, 999, "p0_a_999"));
+ assertThat(sql("SELECT * FROM T_MP WHERE id = 1500"))
+ .containsOnly(Row.of(1, 1500, "p1_1500"));
+ assertThat(sql("SELECT * FROM T_MP WHERE id = 2500"))
+ .containsOnly(Row.of(0, 2500, "p0_b_2500"));
+ }
+
+ private void insertPartitionRows(
+ String tableName, int partition, int startId, int count, String
namePrefix) {
+ final int batchSize = 5_000;
+ for (int offset = 0; offset < count; offset += batchSize) {
+ int batchStart = startId + offset;
+ int batchEnd = Math.min(startId + count, batchStart + batchSize);
+ String values =
+ IntStream.range(batchStart, batchEnd)
+ .mapToObj(
+ i ->
+ String.format(
+ "(%d, %d, '%s%d')",
+ partition, i, namePrefix,
i))
+ .collect(Collectors.joining(","));
+ sql("INSERT INTO %s VALUES %s", tableName, values);
+ }
+ }
+
+ private void buildBTreeIndexForTable(String tableName, String indexColumn)
{
+ sql(
+ "CALL sys.create_global_index(`table` => 'default.%s',
index_column => '%s', index_type => 'btree')",
+ tableName, indexColumn);
+ }
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
index fc8c6b83cb..20d8bc5de2 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/DefaultGlobalIndexBuilder.java
@@ -84,7 +84,14 @@ public class DefaultGlobalIndexBuilder implements
Serializable {
LongCounter rowCounter = new LongCounter(0);
List<ResultEntry> resultEntries = writePaimonRows(data, rowCounter);
List<IndexFileMeta> indexFileMetas =
- toIndexFileMetas(table, rowRange, indexField.id(), indexType,
resultEntries);
+ toIndexFileMetas(
+ table.fileIO(),
+ table.store().pathFactory().globalIndexFileFactory(),
+ table.coreOptions(),
+ rowRange,
+ indexField.id(),
+ indexType,
+ resultEntries);
DataIncrement dataIncrement =
DataIncrement.indexIncrement(indexFileMetas);
return new CommitMessageImpl(
partition, 0, null, dataIncrement,
CompactIncrement.emptyIncrement());
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
index fdfe0f532a..644fa81cdd 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java
@@ -18,7 +18,9 @@
package org.apache.paimon.spark.globalindex.btree;
+import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.BinaryRowSerializer;
import org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder;
import org.apache.paimon.globalindex.btree.BTreeIndexOptions;
import org.apache.paimon.options.Options;
@@ -27,7 +29,6 @@ import org.apache.paimon.spark.SparkRow;
import org.apache.paimon.spark.globalindex.GlobalIndexTopologyBuilder;
import org.apache.paimon.spark.util.ScanPlanHelper$;
import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.SpecialFields;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageSerializer;
import org.apache.paimon.table.source.DataSplit;
@@ -51,8 +52,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
-import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.calcRowRange;
+import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.groupSplitsByRange;
+import static
org.apache.paimon.globalindex.btree.BTreeGlobalIndexBuilder.splitByContiguousRowRange;
/** The {@link GlobalIndexTopologyBuilder} for BTree index. */
public class BTreeIndexTopoBuilder implements GlobalIndexTopologyBuilder {
@@ -82,72 +85,94 @@ public class BTreeIndexTopoBuilder implements
GlobalIndexTopologyBuilder {
indexBuilder =
indexBuilder.withPartitionPredicate(partitionPredicate);
}
- List<DataSplit> splits = indexBuilder.scan();
- Range range = calcRowRange(splits);
- if (splits.isEmpty() || range == null) {
+ List<DataSplit> splits =
splitByContiguousRowRange(indexBuilder.scan());
+ if (splits.isEmpty()) {
+ return Collections.emptyList();
+ }
+ Map<BinaryRow, Map<Range, List<DataSplit>>> partitionRangeSplits =
+ groupSplitsByRange(splits);
+ if (partitionRangeSplits.isEmpty()) {
return Collections.emptyList();
}
- // we need to read all partition columns for shuffle
- List<String> selectedColumns = new ArrayList<>();
- selectedColumns.addAll(table.partitionKeys());
- selectedColumns.addAll(readType.getFieldNames());
-
- Dataset<Row> source =
- PaimonUtils.createDataset(
- spark,
- ScanPlanHelper$.MODULE$.createNewScanPlan(
- splits.toArray(new DataSplit[0]), relation));
-
- Dataset<Row> selected =
-
source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new));
-
- // 2. shuffle and sort by partitions and index keys
- Column[] sortFields =
- selectedColumns.stream()
- .filter(name ->
!SpecialFields.ROW_ID.name().equals(name))
- .map(functions::col)
- .toArray(Column[]::new);
+ List<String> selectedColumns = new
ArrayList<>(readType.getFieldNames());
+ // Calculate maximum parallelism bound
long recordsPerRange =
options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE);
- // this should be superfast since append only table can utilize
count-start pushdown well.
- long rowCount = source.count();
- int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1);
int maxParallelism =
options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM);
- partitionNum = Math.min(partitionNum, maxParallelism);
-
- // For efficiency, we do not repartition within each paimon partition.
Instead, we directly
- // divide ranges by <partitions, index field>, and each subtask is
expected to process
- // records from multiple partitions. The drawback is that if a Paimon
partition spans
- // multiple Spark partitions, the first and last output files may
contain relatively few
- // records.
- Dataset<Row> partitioned =
- selected.repartitionByRange(partitionNum, sortFields)
- .sortWithinPartitions(sortFields);
-
- // 3. write index for each partition & range
- final byte[] serializedBuilder =
InstantiationUtil.serializeObject(indexBuilder);
- final RowType rowType =
-
SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns);
- JavaRDD<byte[]> written =
- partitioned
- .javaRDD()
- .map(row -> (InternalRow) (new SparkRow(rowType, row)))
- .mapPartitions(
- (FlatMapFunction<Iterator<InternalRow>,
byte[]>)
- iter -> buildBTreeIndex(iter,
serializedBuilder, range));
-
- // 4. collect all commit messages and return
- List<byte[]> commitBytes = written.collect();
- return CommitMessageSerializer.deserializeAll(commitBytes);
+
+ List<CommitMessage> allMessages = new ArrayList<>();
+ List<String> sortColumns = new ArrayList<>();
+ sortColumns.add(indexField.name());
+ final int partitionKeyNum = table.partitionKeys().size();
+ BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionKeyNum);
+ for (Map.Entry<BinaryRow, Map<Range, List<DataSplit>>> partitionEntry :
+ partitionRangeSplits.entrySet()) {
+ for (Map.Entry<Range, List<DataSplit>> entry :
partitionEntry.getValue().entrySet()) {
+ Range range = entry.getKey();
+ List<DataSplit> rangeSplits = entry.getValue();
+ if (rangeSplits.isEmpty()) {
+ continue;
+ }
+ int partitionNum = Math.max((int) (range.count() /
recordsPerRange), 1);
+ partitionNum = Math.min(partitionNum, maxParallelism);
+
+ Dataset<Row> source =
+ PaimonUtils.createDataset(
+ spark,
+ ScanPlanHelper$.MODULE$.createNewScanPlan(
+ rangeSplits.toArray(new DataSplit[0]),
relation));
+
+ Dataset<Row> selected =
+ source.select(
+ selectedColumns.stream()
+ .map(functions::col)
+ .toArray(Column[]::new));
+
+ Column[] sortFields =
+
sortColumns.stream().map(functions::col).toArray(Column[]::new);
+
+ Dataset<Row> partitioned =
+ selected.repartitionByRange(partitionNum, sortFields)
+ .sortWithinPartitions(sortFields);
+
+ final byte[] serializedBuilder =
InstantiationUtil.serializeObject(indexBuilder);
+ final byte[] partitionBytes =
+
binaryRowSerializer.serializeToBytes(partitionEntry.getKey());
+ JavaRDD<byte[]> written =
+ partitioned
+ .javaRDD()
+ .map(row -> (InternalRow) (new
SparkRow(readType, row)))
+ .mapPartitions(
+
(FlatMapFunction<Iterator<InternalRow>, byte[]>)
+ iter ->
+ buildBTreeIndex(
+ iter,
+
serializedBuilder,
+ range,
+
partitionKeyNum,
+
partitionBytes));
+ List<byte[]> commitBytes = written.collect();
+
allMessages.addAll(CommitMessageSerializer.deserializeAll(commitBytes));
+ }
+ }
+ return allMessages;
}
private static Iterator<byte[]> buildBTreeIndex(
- Iterator<InternalRow> input, byte[] serializedBuilder, Range range)
+ Iterator<InternalRow> input,
+ byte[] serializedBuilder,
+ Range range,
+ int partitionKeyNum,
+ byte[] partitionBytes)
throws IOException, ClassNotFoundException {
+ final BinaryRowSerializer binaryRowSerializer = new
BinaryRowSerializer(partitionKeyNum);
+ BinaryRow partition =
binaryRowSerializer.deserializeFromBytes(partitionBytes);
BTreeGlobalIndexBuilder builder =
InstantiationUtil.deserializeObject(
serializedBuilder,
BTreeGlobalIndexBuilder.class.getClassLoader());
- return CommitMessageSerializer.serializeAll(builder.build(range,
input)).iterator();
+ return CommitMessageSerializer.serializeAll(
+ builder.buildForSinglePartition(range, partition,
input))
+ .iterator();
}
}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
index 8ff94e1d21..ddc7477bf7 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedure.java
@@ -157,7 +157,11 @@ public class CreateGlobalIndexProcedure extends
BaseProcedure {
userOptions);
try (TableCommitImpl commit =
- table.newCommit("global-index-create-" +
UUID.randomUUID())) {
+ table.newCommit(
+ "global-index-"
+ + indexType
+ + "-create-"
+ + UUID.randomUUID())) {
commit.commit(indexResults);
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
index e20469c371..029d6cd378 100644
---
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala
@@ -177,7 +177,6 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
.asScala
.filter(_.indexFile().indexType() == "btree")
.map(_.indexFile())
- table.store().newGlobalIndexScanBuilder().shardList()
assert(btreeEntries.nonEmpty)
// 1. assert total row count, file count and row range
@@ -237,22 +236,22 @@ class CreateGlobalIndexProcedureTest extends
PaimonSparkTestBase with StreamTest
(0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
- values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
- values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",")
+ values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
- values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
- values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
+ values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
- values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",")
+ values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",")
spark.sql(s"INSERT INTO T VALUES $values")
val output =