Copilot commented on code in PR #6956: URL: https://github.com/apache/paimon/pull/6956#discussion_r2663895062
########## paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The {@link GlobalIndexBuilder} implementation for BTree Index. The caller of {@link + * BTreeGlobalIndexBuilder#build(CloseableIterator) build} must ensure the input data is sorted by + * partitions and indexed field. + */ +public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder { + private static final double FLOATING = 1.2; + + private final IndexFieldsExtractor extractor; + private final long recordsPerRange; + private BinaryRow currentPart = null; + private GlobalIndexParallelWriter currentWriter = null; + private LongCounter counter = new LongCounter(); + + protected BTreeGlobalIndexBuilder(GlobalIndexBuilderContext context) { + super(context); + Preconditions.checkNotNull( + context.fullRange(), "Full range cannot be null for BTreeGlobalIndexBuilder."); + + FileStoreTable table = context.table(); + List<String> readColumns = new ArrayList<>(table.partitionKeys()); + readColumns.addAll(context.readType().getFieldNames()); + this.extractor = + new IndexFieldsExtractor( + SpecialFields.rowTypeWithRowId(table.rowType()).project(readColumns), + table.partitionKeys(), + context.indexField().name()); + + // Each partition boundary is derived from sampling, so we introduce a slack factor + // to avoid generating too many small files due to sampling variance. + this.recordsPerRange = + (long) + (context.options().get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE) + * FLOATING); + } + + @Override + public List<CommitMessage> build(CloseableIterator<InternalRow> data) throws IOException { + List<CommitMessage> commitMessages = new ArrayList<>(); + + while (data.hasNext()) { + InternalRow row = data.next(); + + BinaryRow partRow = extractor.extractPartition(row); + // may flush last part data + // this is correct only if the input is sorted by <partition, indexedField> + if (currentPart != null && !partRow.equals(currentPart) + || counter.getValue() >= recordsPerRange) { Review Comment: The condition has incorrect operator precedence. The current expression evaluates as `(currentPart != null && !partRow.equals(currentPart)) || (counter.getValue() >= recordsPerRange)`, which means the flush will happen when counter exceeds recordsPerRange even if currentPart is null. This should be `(currentPart != null && (!partRow.equals(currentPart) || counter.getValue() >= recordsPerRange))` to ensure we only flush when currentPart is non-null AND either the partition changed OR the counter limit is reached. ```suggestion if (currentPart != null && (!partRow.equals(currentPart) || counter.getValue() >= recordsPerRange)) { ``` ########## paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeGlobalIndexBuilder.java: ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.BinaryRow; +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.GlobalIndexParallelWriter; +import org.apache.paimon.globalindex.GlobalIndexWriter; +import org.apache.paimon.globalindex.ResultEntry; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.index.IndexFileMeta; +import org.apache.paimon.io.CompactIncrement; +import org.apache.paimon.io.DataIncrement; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageImpl; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.LongCounter; +import org.apache.paimon.utils.Preconditions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * The {@link GlobalIndexBuilder} implementation for BTree Index. The caller of {@link + * BTreeGlobalIndexBuilder#build(CloseableIterator) build} must ensure the input data is sorted by + * partitions and indexed field. + */ +public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder { Review Comment: The magic number 1.2 as the FLOATING factor lacks explanation. Consider adding a more detailed comment explaining why this specific value was chosen, what the trade-offs are, and under what conditions it might need adjustment. This will help future maintainers understand the reasoning behind this constant. ```suggestion public class BTreeGlobalIndexBuilder extends GlobalIndexBuilder { // Slack factor applied to the configured BTREE_INDEX_RECORDS_PER_RANGE. A value of 1.2 means // we allow 20% more records per range than the ideal evenly-partitioned case. This provides // tolerance against sampling error when computing partition boundaries: slightly larger // ranges reduce the chance of producing many tiny index files at the cost of fewer, larger // ranges. Tune this value if the sampling strategy or target index file size changes: // - Increase it (> 1.2) when sampling is noisy or when too many small index files are // observed. // - Decrease it (closer to 1.0) when index files become too large or when sampling is // accurate and stable. ``` ########## paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala: ########## @@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest assert(totalRowCount == 189088L) } } + + test("create btree global index") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'btree-index.records-per-range' = '1000') + |""".stripMargin) + + val values = + (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + val table = loadTable("T") + val btreeEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + .map(_.indexFile()) + table.store().newGlobalIndexScanBuilder().shardList() + assert(btreeEntries.nonEmpty) + + // 1. assert total row count and file count + val totalRowCount = btreeEntries.map(_.rowCount()).sum + assert(btreeEntries.size == 100) + assert(totalRowCount == 100000L) + + // 2. assert global index meta not null + btreeEntries.foreach(e => assert(e.globalIndexMeta() != null)) + + // 3. assert btree index file range non-overlapping + case class MetaWithKey(meta: BTreeIndexMeta, first: Object, last: Object) + val keySerializer = KeySerializer.create(new VarCharType()) + val comparator = keySerializer.createComparator() + + def deserialize(bytes: Array[Byte]): Object = { + keySerializer.deserialize(MemorySlice.wrap(bytes)) + } + + val btreeMetas = btreeEntries + .map(_.globalIndexMeta().indexMeta()) + .map(meta => BTreeIndexMeta.deserialize(meta)) + .map( + m => { + assert(m.getFirstKey != null) + assert(m.getLastKey != null) + MetaWithKey(m, deserialize(m.getFirstKey), deserialize(m.getLastKey)) + }) + + // sort by first key + val sorted = btreeMetas.sortWith((m1, m2) => comparator.compare(m1.first, m2.first) < 0) + + // should not overlap + sorted.sliding(2).foreach { + case Seq(prev: MetaWithKey, next: MetaWithKey) => + assert(comparator.compare(prev.last, next.first) <= 0) + case _ => // ignore + } + } + } + + test("create btree global index with multiple partitions") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 22222).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 100).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 100).map(i => s"($i, 'name_$i', 'p2')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 33333).map(i => s"($i, 'name_$i', 'p2')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 33333).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + + assertMultiplePartitionsResult("T", 189088L, 3) + } + } + + test("create btree index within one spark partition") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING, pt STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true') + | PARTITIONED BY (pt) + |""".stripMargin) + + var values = + (0 until 65000).map(i => s"($i, 'name_$i', 'p0')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + values = (0 until 35000).map(i => s"($i, 'name_$i', 'p1')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + // force output parallelism = 1 + val output = + spark + .sql("CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000,btree-index.build.max-parallelism=1')") + .collect() + .head + + assert(output.getBoolean(0)) + + assertMultiplePartitionsResult("T", 100000L, 2) + } + } + + private def assertMultiplePartitionsResult( + tableName: String, + rowCount: Long, + partCount: Int + ): Unit = { + val table = loadTable(tableName) + val btreeEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + table.store().newGlobalIndexScanBuilder().shardList() Review Comment: The result of 'table.store().newGlobalIndexScanBuilder().shardList()' is not assigned or used. This appears to be a leftover from debugging or testing. If the call has side effects that are needed for testing, add a comment explaining why; otherwise, remove this line. ```suggestion ``` ########## paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.spark.SparkRow; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils; +import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder; +import org.apache.paimon.spark.util.ScanPlanHelper$; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.Range; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.PaimonUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.functions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** The {@link GlobalIndexTopoBuilder} for BTree index. */ +public class BTreeIndexTopoBuilder implements GlobalIndexTopoBuilder { + + @Override + public List<CommitMessage> buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + DataField indexField, + Options options) + throws IOException { + + // 1. read the whole dataset of target partitions + SnapshotReader snapshotReader = table.newSnapshotReader(); + if (partitionPredicate != null) { + snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate); + } + + List<DataSplit> dataSplits = snapshotReader.read().dataSplits(); + Range fullRange = calcRowRange(dataSplits); + if (dataSplits.isEmpty() || fullRange == null) { + return Collections.emptyList(); + } + + // we need to read all partition columns for shuffle + List<String> selectedColumns = new ArrayList<>(); + selectedColumns.addAll(table.partitionKeys()); + selectedColumns.addAll(readType.getFieldNames()); + + Dataset<Row> source = + PaimonUtils.createDataset( + spark, + ScanPlanHelper$.MODULE$.createNewScanPlan( + dataSplits.toArray(new DataSplit[0]), relation)); + + Dataset<Row> selected = + source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new)); + + // 2. shuffle and sort by partitions and index keys + Column[] sortFields = + selectedColumns.stream() + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .map(functions::col) + .toArray(Column[]::new); + + long recordPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); + // this should be superfast since append only table can utilize count-start pushdown well. + long rowCount = source.count(); + int partitionNum = Math.max((int) (rowCount / recordPerRange), 1); + int maxParallelism = options.get(BTreeIndexOptions.BTREE_INDEX_BUILD_MAX_PARALLELISM); + partitionNum = Math.min(partitionNum, maxParallelism); + + // For efficiency, we do not repartition within each paimon partition. Instead, we directly + // divide ranges by <partitions, index field>, and each subtask is expected to process + // records from multiple partitions. The drawback is that if a Paimon partition spans + // multiple Spark partitions, the first and last output files may contain relatively few + // records. + Dataset<Row> partitioned = + selected.repartitionByRange(partitionNum, sortFields) + .sortWithinPartitions(sortFields); + + // 3. write index for each partition & range + final GlobalIndexBuilderContext context = + new GlobalIndexBuilderContext( + table, null, readType, indexField, indexType, 0, options, fullRange); + final RowType rowType = + SpecialFields.rowTypeWithRowId(table.rowType()).project(selectedColumns); + JavaRDD<byte[]> written = + partitioned + .javaRDD() + .map(row -> (InternalRow) (new SparkRow(rowType, row))) + .mapPartitions( + (FlatMapFunction<Iterator<InternalRow>, byte[]>) + iter -> { + CommitMessageSerializer commitMessageSerializer = + new CommitMessageSerializer(); + + GlobalIndexBuilder globalIndexBuilder = + GlobalIndexBuilderFactoryUtils + .createIndexBuilder(context); + + List<CommitMessage> commitMessages = + globalIndexBuilder.build( + CloseableIterator.adapterForIterator( + iter)); + List<byte[]> messageBytes = new ArrayList<>(); + + for (CommitMessage commitMessage : commitMessages) { + messageBytes.add( + commitMessageSerializer.serialize( + commitMessage)); Review Comment: The CloseableIterator created from the Spark Iterator is not being closed after use. Although the underlying Spark Iterator doesn't require closing, the build method may throw an IOException, and when that happens, the CloseableIterator should be properly closed to ensure any resources it manages are released. Consider wrapping the build call in a try-with-resources block to ensure proper cleanup even if an exception occurs. ```suggestion List<byte[]> messageBytes = new ArrayList<>(); try (CloseableIterator<InternalRow> closeableIterator = CloseableIterator.adapterForIterator(iter)) { List<CommitMessage> commitMessages = globalIndexBuilder.build(closeableIterator); for (CommitMessage commitMessage : commitMessages) { messageBytes.add( commitMessageSerializer.serialize( commitMessage)); } ``` ########## paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/globalindex/btree/BTreeIndexTopoBuilder.java: ########## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.globalindex.btree; + +import org.apache.paimon.data.InternalRow; +import org.apache.paimon.globalindex.btree.BTreeIndexOptions; +import org.apache.paimon.io.DataFileMeta; +import org.apache.paimon.options.Options; +import org.apache.paimon.partition.PartitionPredicate; +import org.apache.paimon.spark.SparkRow; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilder; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderContext; +import org.apache.paimon.spark.globalindex.GlobalIndexBuilderFactoryUtils; +import org.apache.paimon.spark.globalindex.GlobalIndexTopoBuilder; +import org.apache.paimon.spark.util.ScanPlanHelper$; +import org.apache.paimon.table.FileStoreTable; +import org.apache.paimon.table.SpecialFields; +import org.apache.paimon.table.sink.CommitMessage; +import org.apache.paimon.table.sink.CommitMessageSerializer; +import org.apache.paimon.table.source.DataSplit; +import org.apache.paimon.table.source.snapshot.SnapshotReader; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.RowType; +import org.apache.paimon.utils.CloseableIterator; +import org.apache.paimon.utils.Range; + +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Column; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.PaimonUtils; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation; +import org.apache.spark.sql.functions; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; + +/** The {@link GlobalIndexTopoBuilder} for BTree index. */ +public class BTreeIndexTopoBuilder implements GlobalIndexTopoBuilder { + + @Override + public List<CommitMessage> buildIndex( + SparkSession spark, + DataSourceV2Relation relation, + PartitionPredicate partitionPredicate, + FileStoreTable table, + String indexType, + RowType readType, + DataField indexField, + Options options) + throws IOException { + + // 1. read the whole dataset of target partitions + SnapshotReader snapshotReader = table.newSnapshotReader(); + if (partitionPredicate != null) { + snapshotReader = snapshotReader.withPartitionFilter(partitionPredicate); + } + + List<DataSplit> dataSplits = snapshotReader.read().dataSplits(); + Range fullRange = calcRowRange(dataSplits); + if (dataSplits.isEmpty() || fullRange == null) { + return Collections.emptyList(); + } + + // we need to read all partition columns for shuffle + List<String> selectedColumns = new ArrayList<>(); + selectedColumns.addAll(table.partitionKeys()); + selectedColumns.addAll(readType.getFieldNames()); + + Dataset<Row> source = + PaimonUtils.createDataset( + spark, + ScanPlanHelper$.MODULE$.createNewScanPlan( + dataSplits.toArray(new DataSplit[0]), relation)); + + Dataset<Row> selected = + source.select(selectedColumns.stream().map(functions::col).toArray(Column[]::new)); + + // 2. shuffle and sort by partitions and index keys + Column[] sortFields = + selectedColumns.stream() + .filter(name -> !SpecialFields.ROW_ID.name().equals(name)) + .map(functions::col) + .toArray(Column[]::new); + + long recordPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); + // this should be superfast since append only table can utilize count-start pushdown well. + long rowCount = source.count(); + int partitionNum = Math.max((int) (rowCount / recordPerRange), 1); Review Comment: The variable name 'recordPerRange' should be 'recordsPerRange' (plural) to be grammatically correct and consistent with the configuration option name 'BTREE_INDEX_RECORDS_PER_RANGE' and the field name 'recordsPerRange' used in BTreeGlobalIndexBuilder. ```suggestion long recordsPerRange = options.get(BTreeIndexOptions.BTREE_INDEX_RECORDS_PER_RANGE); // this should be superfast since append only table can utilize count-start pushdown well. long rowCount = source.count(); int partitionNum = Math.max((int) (rowCount / recordsPerRange), 1); ``` ########## paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/CreateGlobalIndexProcedureTest.scala: ########## @@ -137,4 +140,219 @@ class CreateGlobalIndexProcedureTest extends PaimonSparkTestBase with StreamTest assert(totalRowCount == 189088L) } } + + test("create btree global index") { + withTable("T") { + spark.sql(""" + |CREATE TABLE T (id INT, name STRING) + |TBLPROPERTIES ( + | 'bucket' = '-1', + | 'global-index.row-count-per-shard' = '10000', + | 'row-tracking.enabled' = 'true', + | 'data-evolution.enabled' = 'true', + | 'btree-index.records-per-range' = '1000') + |""".stripMargin) + + val values = + (0 until 100000).map(i => s"($i, 'name_$i')").mkString(",") + spark.sql(s"INSERT INTO T VALUES $values") + + val output = + spark + .sql( + "CALL sys.create_global_index(table => 'test.T', index_column => 'name', index_type => 'btree'," + + " options => 'btree-index.records-per-range=1000')") + .collect() + .head + + assert(output.getBoolean(0)) + val table = loadTable("T") + val btreeEntries = table + .store() + .newIndexFileHandler() + .scanEntries() + .asScala + .filter(_.indexFile().indexType() == "btree") + .map(_.indexFile()) + table.store().newGlobalIndexScanBuilder().shardList() Review Comment: The result of 'table.store().newGlobalIndexScanBuilder().shardList()' is not assigned or used. This appears to be a leftover from debugging or testing. If the call has side effects that are needed for testing, add a comment explaining why; otherwise, remove this line. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
