This is an automated email from the ASF dual-hosted git repository.
JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new af1af37121 [flink] Introduce drop global index procedure. (#7822)
af1af37121 is described below
commit af1af37121a2953ca278e6f8304de90d8b1215b1
Author: zhoulii <[email protected]>
AuthorDate: Tue May 12 16:35:46 2026 +0800
[flink] Introduce drop global index procedure. (#7822)
---
docs/content/flink/procedures.md | 62 +++++
.../flink/procedure/DropGlobalIndexProcedure.java | 200 ++++++++++++++
.../services/org.apache.paimon.factories.Factory | 3 +-
.../procedure/DropGlobalIndexProcedureITCase.java | 304 +++++++++++++++++++++
4 files changed, 568 insertions(+), 1 deletion(-)
diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1c982c3474..3202aedbbe 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -951,6 +951,68 @@ All available procedures are listed below.
CALL sys.drop_function(`function` => 'function_identifier')<br/>
</td>
</tr>
+ <tr>
+ <td>create_global_index</td>
+ <td>
+ CALL [catalog.]sys.create_global_index(<br/>
+ `table` => 'table',<br/>
+ `index_column` => 'columnName',<br/>
+ `index_type` => 'indexType',<br/>
+ `partitions` => 'partitions',<br/>
+ `options` => 'key1=value1,key2=value2')<br/>
+ </td>
+ <td>
+ To create a global index on a table for accelerating queries.
Arguments:
+ <li>table(required): the target table identifier.</li>
+ <li>index_column(required): the column name to build index on.</li>
+ <li>index_type(required): the type of global index, supported
types include 'bitmap', 'btree', 'lumina', 'tantivy-fulltext'.</li>
+ <li>partitions(optional): partition filter for selective index
creation.</li>
+ <li>options(optional): additional dynamic options for index
creation.</li>
+ </td>
+ <td>
+ -- Create bitmap index<br/>
+ CALL sys.create_global_index(<br/>
+ `table` => 'default.T',<br/>
+ `index_column` => 'name',<br/>
+ `index_type` => 'bitmap')<br/><br/>
+ -- Create index for specific partitions<br/>
+ CALL sys.create_global_index(<br/>
+ `table` => 'default.T',<br/>
+ `index_column` => 'name',<br/>
+ `index_type` => 'bitmap',<br/>
+ `partitions` => 'pt=p1;pt=p2')
+ </td>
+ </tr>
+ <tr>
+ <td>drop_global_index</td>
+ <td>
+ CALL [catalog.]sys.drop_global_index(<br/>
+ `table` => 'table',<br/>
+ `index_column` => 'columnName',<br/>
+ `index_type` => 'indexType',<br/>
+ `partitions` => 'partitions')<br/>
+ </td>
+ <td>
+ To drop global index files from a table. Arguments:
+ <li>table(required): the target table identifier.</li>
+ <li>index_column(required): the column name for which to drop the
index.</li>
+ <li>index_type(required): the type of global index to drop, e.g.,
'bitmap', 'btree'.</li>
+ <li>partitions(optional): partition specification for selective
index deletion.</li>
+ </td>
+ <td>
+ -- Drop all bitmap indexes for column 'name'<br/>
+ CALL sys.drop_global_index(<br/>
+ `table` => 'default.T',<br/>
+ `index_column` => 'name',<br/>
+ `index_type` => 'bitmap')<br/><br/>
+ -- Drop indexes only for specific partitions<br/>
+ CALL sys.drop_global_index(<br/>
+ `table` => 'default.T',<br/>
+ `index_column` => 'name',<br/>
+ `index_type` => 'bitmap',<br/>
+ `partitions` => 'pt=p1;pt=p2')
+ </td>
+ </tr>
<tr>
<td>vector_search</td>
<td>
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
new file mode 100644
index 0000000000..a5ab0239c2
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Procedure to drop global index files via Flink. */
+public class DropGlobalIndexProcedure extends ProcedureBase {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(DropGlobalIndexProcedure.class);
+
+ public static final String IDENTIFIER = "drop_global_index";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @ProcedureHint(
+ argument = {
+ @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+ @ArgumentHint(name = "index_column", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(name = "index_type", type =
@DataTypeHint("STRING")),
+ @ArgumentHint(
+ name = "partitions",
+ type = @DataTypeHint("STRING"),
+ isOptional = true)
+ })
+ public String[] call(
+ ProcedureContext procedureContext,
+ String tableId,
+ String indexColumn,
+ String indexType,
+ String partitions)
+ throws Exception {
+
+ FileStoreTable table = (FileStoreTable) table(tableId);
+
+ // Validate column exists
+ RowType rowType = table.rowType();
+ checkArgument(
+ rowType.containsField(indexColumn),
+ "Column '%s' does not exist in table '%s'.",
+ indexColumn,
+ tableId);
+
+ // Parse partition predicate
+ PartitionPredicate partitionPredicate = parsePartitionPredicate(table,
partitions);
+
+ // Normalize index type
+ final String indexTypeLower = indexType.toLowerCase().trim();
+
+ // Get column field ID for final reference in lambda
+ final int columnId = rowType.getField(indexColumn).id();
+
+ // Get latest snapshot
+ Snapshot snapshot =
+ table.latestSnapshot()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ String.format(
+ "Table '%s' has no
snapshot.", tableId)));
+
+ // Create filter for index entries to delete
+ Filter<IndexManifestEntry> filter =
+ entry ->
+ entry.indexFile().indexType().equals(indexTypeLower)
+ && entry.indexFile().globalIndexMeta() != null
+ &&
entry.indexFile().globalIndexMeta().indexFieldId() == columnId
+ && (partitionPredicate == null
+ ||
partitionPredicate.test(entry.partition()));
+
+ // Scan for index files to delete
+ List<IndexManifestEntry> waitToDelete =
+ table.store().newIndexFileHandler().scan(snapshot, filter);
+
+ LOG.info(
+ "Found {} {} global index files to delete for column '{}' on
table '{}'",
+ waitToDelete.size(),
+ indexTypeLower,
+ indexColumn,
+ table.name());
+
+ if (waitToDelete.isEmpty()) {
+ return new String[] {
+ "No " + indexTypeLower + " global index found for column '" +
indexColumn + "'"
+ };
+ }
+
+ // Group index files by partition
+ Map<BinaryRow, List<IndexFileMeta>> deleteEntries =
+ waitToDelete.stream()
+ .map(IndexManifestEntry::toDeleteEntry)
+ .collect(
+ Collectors.groupingBy(
+ IndexManifestEntry::partition,
+ Collectors.mapping(
+ IndexManifestEntry::indexFile,
+ Collectors.toList())));
+
+ // Create commit messages
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry :
deleteEntries.entrySet()) {
+ BinaryRow partition = entry.getKey();
+ List<IndexFileMeta> indexFileMetas = entry.getValue();
+ commitMessages.add(
+ new CommitMessageImpl(
+ partition,
+ 0,
+ null,
+ DataIncrement.deleteIndexIncrement(indexFileMetas),
+ CompactIncrement.emptyIncrement()));
+ }
+
+ // Commit the deletion
+ try (TableCommitImpl commit = table.newCommit("drop-global-index-" +
UUID.randomUUID())) {
+ commit.commit(commitMessages);
+ }
+
+ LOG.info(
+ "Successfully dropped {} {} global index files for column '{}'
on table '{}'",
+ waitToDelete.size(),
+ indexTypeLower,
+ indexColumn,
+ table.name());
+
+ return new String[] {
+ "Dropped "
+ + waitToDelete.size()
+ + " "
+ + indexTypeLower
+ + " global index files for column '"
+ + indexColumn
+ + "' on table '"
+ + table.name()
+ + "'"
+ };
+ }
+
+ private PartitionPredicate parsePartitionPredicate(FileStoreTable table,
String partitions) {
+ if (partitions == null || partitions.isEmpty()) {
+ return null;
+ }
+
+ List<Map<String, String>> partitionList =
getPartitions(partitions.split(";"));
+ Predicate predicate =
+ ParameterUtils.toPartitionPredicate(
+ partitionList,
+ table.schema().logicalPartitionType(),
+ table.coreOptions().partitionDefaultName());
+ return
PartitionPredicate.fromPredicate(table.schema().logicalPartitionType(),
predicate);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d687fe2244..c8f4640017 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -101,4 +101,5 @@
org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
-org.apache.paimon.flink.procedure.VectorSearchProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.VectorSearchProcedure
+org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
new file mode 100644
index 0000000000..acda61357c
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link DropGlobalIndexProcedure}. */
+public class DropGlobalIndexProcedureITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testDropBitmapGlobalIndex() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " id INT,"
+ + " name STRING"
+ + ") WITH ("
+ + " 'bucket' = '-1',"
+ + " 'global-index.row-count-per-shard' = '10000',"
+ + " 'row-tracking.enabled' = 'true',"
+ + " 'data-evolution.enabled' = 'true'"
+ + ")");
+
+ // Insert 100000 records using BatchTableWrite for efficiency
+ FileStoreTable table = paimonTable("T");
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < 100000; i++) {
+ batchTableWrite.write(GenericRow.of(i,
BinaryString.fromString("name_" + i)));
+ }
+ List<CommitMessage> commitMessages =
batchTableWrite.prepareCommit();
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(commitMessages);
+ commit.close();
+ }
+
+ // Create bitmap index
+ tEnv.getConfig()
+
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+ List<Row> createResult =
+ sql(
+ "CALL sys.create_global_index(`table` => 'default.T', "
+ + "`index_column` => 'name', "
+ + "`index_type` => 'bitmap')");
+ assertThat(createResult).hasSize(1);
+ assertThat(createResult.get(0).getField(0))
+ .isEqualTo("bitmap global index created successfully for
table: T");
+
+ // Verify index was created
+ table = paimonTable("T");
+ List<IndexManifestEntry> bitmapEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(entry ->
entry.indexFile().indexType().equals("bitmap"))
+ .collect(Collectors.toList());
+ assertThat(bitmapEntries).isNotEmpty();
+ long totalRowCount =
+ bitmapEntries.stream()
+ .map(entry -> entry.indexFile().rowCount())
+ .mapToLong(Long::longValue)
+ .sum();
+ assertThat(totalRowCount).isEqualTo(100000L);
+
+ // Drop bitmap index
+ List<Row> dropResult =
+ sql(
+ "CALL sys.drop_global_index(`table` => 'default.T', "
+ + "`index_column` => 'name', "
+ + "`index_type` => 'bitmap')");
+ assertThat(dropResult).hasSize(1);
+ assertThat(dropResult.get(0).getField(0))
+ .isInstanceOf(String.class)
+ .asString()
+ .contains("Dropped")
+ .contains("bitmap")
+ .contains("global index files")
+ .contains("name");
+
+ // Verify index was dropped
+ table = paimonTable("T");
+ bitmapEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(entry ->
entry.indexFile().indexType().equals("bitmap"))
+ .collect(Collectors.toList());
+ assertThat(bitmapEntries).isEmpty();
+ }
+
+ @Test
+ public void testDropBitmapGlobalIndexWithPartition() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " id INT,"
+ + " name STRING,"
+ + " pt STRING"
+ + ") PARTITIONED BY (pt) WITH ("
+ + " 'bucket' = '-1',"
+ + " 'global-index.row-count-per-shard' = '10000',"
+ + " 'row-tracking.enabled' = 'true',"
+ + " 'data-evolution.enabled' = 'true'"
+ + ")");
+
+ // Insert records into different partitions using BatchTableWrite for
efficiency
+ FileStoreTable table = paimonTable("T");
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < 65000; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p0")));
+ }
+
+ for (int i = 0; i < 35000; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p1")));
+ }
+
+ for (int i = 0; i < 22222; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p0")));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p1")));
+ }
+
+ for (int i = 0; i < 100; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p2")));
+ }
+ for (int i = 0; i < 33333; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p2")));
+ }
+
+ for (int i = 0; i < 33333; i++) {
+ batchTableWrite.write(
+ GenericRow.of(
+ i,
+ BinaryString.fromString("name_" + i),
+ BinaryString.fromString("p1")));
+ }
+
+ List<CommitMessage> commitMessages =
batchTableWrite.prepareCommit();
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(commitMessages);
+ commit.close();
+ }
+
+ // Create bitmap index
+ tEnv.getConfig()
+
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+ List<Row> createResult =
+ sql(
+ "CALL sys.create_global_index(`table` => 'default.T', "
+ + "`index_column` => 'name', "
+ + "`index_type` => 'bitmap')");
+ assertThat(createResult).hasSize(1);
+
+ // Verify index was created
+ table = paimonTable("T");
+ List<IndexManifestEntry> bitmapEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(entry ->
entry.indexFile().indexType().equals("bitmap"))
+ .collect(Collectors.toList());
+ assertThat(bitmapEntries).isNotEmpty();
+
+ // Verify total row count
+ long totalRowCount =
+ bitmapEntries.stream()
+ .map(
+ entry ->
+
entry.indexFile().globalIndexMeta().rowRangeEnd()
+ - entry.indexFile()
+ .globalIndexMeta()
+ .rowRangeStart()
+ + 1)
+ .mapToLong(Long::longValue)
+ .sum();
+ assertThat(totalRowCount).isEqualTo(189088L);
+
+ // Drop bitmap index for partition p1 only
+ List<Row> dropResult =
+ sql(
+ "CALL sys.drop_global_index(`table` => 'default.T', "
+ + "`index_column` => 'name', "
+ + "`index_type` => 'bitmap', "
+ + "`partitions` => 'pt=p1')");
+ assertThat(dropResult).hasSize(1);
+ assertThat(dropResult.get(0).getField(0))
+ .isInstanceOf(String.class)
+ .asString()
+ .contains("Dropped")
+ .contains("bitmap");
+
+ // Verify only p1 index was dropped
+ bitmapEntries =
+ table.store().newIndexFileHandler().scanEntries().stream()
+ .filter(entry ->
entry.indexFile().indexType().equals("bitmap"))
+ .collect(Collectors.toList());
+ assertThat(bitmapEntries).isNotEmpty();
+
+ // Verify remaining row count (p0: 87222 + p2: 33433 = 120655)
+ long remainingRowCount =
+ bitmapEntries.stream()
+ .map(
+ entry ->
+
entry.indexFile().globalIndexMeta().rowRangeEnd()
+ - entry.indexFile()
+ .globalIndexMeta()
+ .rowRangeStart()
+ + 1)
+ .mapToLong(Long::longValue)
+ .sum();
+ assertThat(remainingRowCount).isEqualTo(120655L);
+ }
+
+ @Test
+ public void testDropNonExistentIndex() throws Exception {
+ sql(
+ "CREATE TABLE T ("
+ + " id INT,"
+ + " name STRING"
+ + ") WITH ("
+ + " 'bucket' = '-1',"
+ + " 'row-tracking.enabled' = 'true',"
+ + " 'data-evolution.enabled' = 'true'"
+ + ")");
+
+ // Insert some data to create a snapshot
+ FileStoreTable table = paimonTable("T");
+ BatchWriteBuilder builder = table.newBatchWriteBuilder();
+ try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+ for (int i = 0; i < 1000; i++) {
+ batchTableWrite.write(GenericRow.of(i,
BinaryString.fromString("name_" + i)));
+ }
+ List<CommitMessage> commitMessages =
batchTableWrite.prepareCommit();
+ BatchTableCommit commit = builder.newCommit();
+ commit.commit(commitMessages);
+ commit.close();
+ }
+
+ // Try to drop non-existent index
+ tEnv.getConfig()
+
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+ List<Row> dropResult =
+ sql(
+ "CALL sys.drop_global_index(`table` => 'default.T', "
+ + "`index_column` => 'name', "
+ + "`index_type` => 'bitmap')");
+ assertThat(dropResult).hasSize(1);
+ assertThat(dropResult.get(0).getField(0))
+ .isInstanceOf(String.class)
+ .asString()
+ .contains("No bitmap global index found for column 'name'");
+ }
+}