This is an automated email from the ASF dual-hosted git repository.

JingsongLi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new af1af37121 [flink] Introduce drop global index procedure. (#7822)
af1af37121 is described below

commit af1af37121a2953ca278e6f8304de90d8b1215b1
Author: zhoulii <[email protected]>
AuthorDate: Tue May 12 16:35:46 2026 +0800

    [flink] Introduce drop global index procedure. (#7822)
---
 docs/content/flink/procedures.md                   |  62 +++++
 .../flink/procedure/DropGlobalIndexProcedure.java  | 200 ++++++++++++++
 .../services/org.apache.paimon.factories.Factory   |   3 +-
 .../procedure/DropGlobalIndexProcedureITCase.java  | 304 +++++++++++++++++++++
 4 files changed, 568 insertions(+), 1 deletion(-)

diff --git a/docs/content/flink/procedures.md b/docs/content/flink/procedures.md
index 1c982c3474..3202aedbbe 100644
--- a/docs/content/flink/procedures.md
+++ b/docs/content/flink/procedures.md
@@ -951,6 +951,68 @@ All available procedures are listed below.
          CALL sys.drop_function(`function` => 'function_identifier')<br/>
       </td>
    </tr>
+   <tr>
+      <td>create_global_index</td>
+      <td>
+         CALL [catalog.]sys.create_global_index(<br/>
+            `table` => 'table',<br/>
+            `index_column` => 'columnName',<br/>
+            `index_type` => 'indexType',<br/>
+            `partitions` => 'partitions',<br/>
+            `options` => 'key1=value1,key2=value2')<br/>
+      </td>
+      <td>
+         To create a global index on a table for accelerating queries. 
Arguments:
+            <li>table(required): the target table identifier.</li>
+            <li>index_column(required): the column name to build index on.</li>
+            <li>index_type(required): the type of global index, supported 
types include 'bitmap', 'btree', 'lumina', 'tantivy-fulltext'.</li>
+            <li>partitions(optional): partition filter for selective index 
creation.</li>
+            <li>options(optional): additional dynamic options for index 
creation.</li>
+      </td>
+      <td>
+         -- Create bitmap index<br/>
+         CALL sys.create_global_index(<br/>
+            `table` => 'default.T',<br/>
+            `index_column` => 'name',<br/>
+            `index_type` => 'bitmap')<br/><br/>
+         -- Create index for specific partitions<br/>
+         CALL sys.create_global_index(<br/>
+            `table` => 'default.T',<br/>
+            `index_column` => 'name',<br/>
+            `index_type` => 'bitmap',<br/>
+            `partitions` => 'pt=p1;pt=p2')
+      </td>
+   </tr>
+   <tr>
+      <td>drop_global_index</td>
+      <td>
+         CALL [catalog.]sys.drop_global_index(<br/>
+            `table` => 'table',<br/>
+            `index_column` => 'columnName',<br/>
+            `index_type` => 'indexType',<br/>
+            `partitions` => 'partitions')<br/>
+      </td>
+      <td>
+         To drop global index files from a table. Arguments:
+            <li>table(required): the target table identifier.</li>
+            <li>index_column(required): the column name for which to drop the 
index.</li>
+            <li>index_type(required): the type of global index to drop, e.g., 
'bitmap', 'btree'.</li>
+            <li>partitions(optional): partition specification for selective 
index deletion.</li>
+      </td>
+      <td>
+         -- Drop all bitmap indexes for column 'name'<br/>
+         CALL sys.drop_global_index(<br/>
+            `table` => 'default.T',<br/>
+            `index_column` => 'name',<br/>
+            `index_type` => 'bitmap')<br/><br/>
+         -- Drop indexes only for specific partitions<br/>
+         CALL sys.drop_global_index(<br/>
+            `table` => 'default.T',<br/>
+            `index_column` => 'name',<br/>
+            `index_type` => 'bitmap',<br/>
+            `partitions` => 'pt=p1;pt=p2')
+      </td>
+   </tr>
    <tr>
       <td>vector_search</td>
       <td>
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
new file mode 100644
index 0000000000..a5ab0239c2
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedure.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.index.IndexFileMeta;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataIncrement;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.partition.PartitionPredicate;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
+import org.apache.paimon.table.sink.TableCommitImpl;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.ParameterUtils;
+
+import org.apache.flink.table.annotation.ArgumentHint;
+import org.apache.flink.table.annotation.DataTypeHint;
+import org.apache.flink.table.annotation.ProcedureHint;
+import org.apache.flink.table.procedure.ProcedureContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.utils.ParameterUtils.getPartitions;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Procedure to drop global index files via Flink. */
+public class DropGlobalIndexProcedure extends ProcedureBase {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(DropGlobalIndexProcedure.class);
+
+    public static final String IDENTIFIER = "drop_global_index";
+
+    @Override
+    public String identifier() {
+        return IDENTIFIER;
+    }
+
+    @ProcedureHint(
+            argument = {
+                @ArgumentHint(name = "table", type = @DataTypeHint("STRING")),
+                @ArgumentHint(name = "index_column", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(name = "index_type", type = 
@DataTypeHint("STRING")),
+                @ArgumentHint(
+                        name = "partitions",
+                        type = @DataTypeHint("STRING"),
+                        isOptional = true)
+            })
+    public String[] call(
+            ProcedureContext procedureContext,
+            String tableId,
+            String indexColumn,
+            String indexType,
+            String partitions)
+            throws Exception {
+
+        FileStoreTable table = (FileStoreTable) table(tableId);
+
+        // Validate column exists
+        RowType rowType = table.rowType();
+        checkArgument(
+                rowType.containsField(indexColumn),
+                "Column '%s' does not exist in table '%s'.",
+                indexColumn,
+                tableId);
+
+        // Parse partition predicate
+        PartitionPredicate partitionPredicate = parsePartitionPredicate(table, 
partitions);
+
+        // Normalize index type
+        final String indexTypeLower = indexType.toLowerCase().trim();
+
+        // Get column field ID for final reference in lambda
+        final int columnId = rowType.getField(indexColumn).id();
+
+        // Get latest snapshot
+        Snapshot snapshot =
+                table.latestSnapshot()
+                        .orElseThrow(
+                                () ->
+                                        new IllegalStateException(
+                                                String.format(
+                                                        "Table '%s' has no 
snapshot.", tableId)));
+
+        // Create filter for index entries to delete
+        Filter<IndexManifestEntry> filter =
+                entry ->
+                        entry.indexFile().indexType().equals(indexTypeLower)
+                                && entry.indexFile().globalIndexMeta() != null
+                                && 
entry.indexFile().globalIndexMeta().indexFieldId() == columnId
+                                && (partitionPredicate == null
+                                        || 
partitionPredicate.test(entry.partition()));
+
+        // Scan for index files to delete
+        List<IndexManifestEntry> waitToDelete =
+                table.store().newIndexFileHandler().scan(snapshot, filter);
+
+        LOG.info(
+                "Found {} {} global index files to delete for column '{}' on 
table '{}'",
+                waitToDelete.size(),
+                indexTypeLower,
+                indexColumn,
+                table.name());
+
+        if (waitToDelete.isEmpty()) {
+            return new String[] {
+                "No " + indexTypeLower + " global index found for column '" + 
indexColumn + "'"
+            };
+        }
+
+        // Group index files by partition
+        Map<BinaryRow, List<IndexFileMeta>> deleteEntries =
+                waitToDelete.stream()
+                        .map(IndexManifestEntry::toDeleteEntry)
+                        .collect(
+                                Collectors.groupingBy(
+                                        IndexManifestEntry::partition,
+                                        Collectors.mapping(
+                                                IndexManifestEntry::indexFile,
+                                                Collectors.toList())));
+
+        // Create commit messages
+        List<CommitMessage> commitMessages = new ArrayList<>();
+        for (Map.Entry<BinaryRow, List<IndexFileMeta>> entry : 
deleteEntries.entrySet()) {
+            BinaryRow partition = entry.getKey();
+            List<IndexFileMeta> indexFileMetas = entry.getValue();
+            commitMessages.add(
+                    new CommitMessageImpl(
+                            partition,
+                            0,
+                            null,
+                            DataIncrement.deleteIndexIncrement(indexFileMetas),
+                            CompactIncrement.emptyIncrement()));
+        }
+
+        // Commit the deletion
+        try (TableCommitImpl commit = table.newCommit("drop-global-index-" + 
UUID.randomUUID())) {
+            commit.commit(commitMessages);
+        }
+
+        LOG.info(
+                "Successfully dropped {} {} global index files for column '{}' 
on table '{}'",
+                waitToDelete.size(),
+                indexTypeLower,
+                indexColumn,
+                table.name());
+
+        return new String[] {
+            "Dropped "
+                    + waitToDelete.size()
+                    + " "
+                    + indexTypeLower
+                    + " global index files for column '"
+                    + indexColumn
+                    + "' on table '"
+                    + table.name()
+                    + "'"
+        };
+    }
+
+    private PartitionPredicate parsePartitionPredicate(FileStoreTable table, 
String partitions) {
+        if (partitions == null || partitions.isEmpty()) {
+            return null;
+        }
+
+        List<Map<String, String>> partitionList = 
getPartitions(partitions.split(";"));
+        Predicate predicate =
+                ParameterUtils.toPartitionPredicate(
+                        partitionList,
+                        table.schema().logicalPartitionType(),
+                        table.coreOptions().partitionDefaultName());
+        return 
PartitionPredicate.fromPredicate(table.schema().logicalPartitionType(), 
predicate);
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index d687fe2244..c8f4640017 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -101,4 +101,5 @@ 
org.apache.paimon.flink.procedure.TriggerTagAutomaticCreationProcedure
 org.apache.paimon.flink.procedure.RemoveUnexistingManifestsProcedure
 org.apache.paimon.flink.procedure.DataEvolutionMergeIntoProcedure
 org.apache.paimon.flink.procedure.CreateGlobalIndexProcedure
-org.apache.paimon.flink.procedure.VectorSearchProcedure
\ No newline at end of file
+org.apache.paimon.flink.procedure.VectorSearchProcedure
+org.apache.paimon.flink.procedure.DropGlobalIndexProcedure
\ No newline at end of file
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
new file mode 100644
index 0000000000..acda61357c
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/procedure/DropGlobalIndexProcedureITCase.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.flink.CatalogITCaseBase;
+import org.apache.paimon.manifest.IndexManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import org.apache.flink.types.Row;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** IT Case for {@link DropGlobalIndexProcedure}. */
+public class DropGlobalIndexProcedureITCase extends CatalogITCaseBase {
+
+    @Test
+    public void testDropBitmapGlobalIndex() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " id INT,"
+                        + " name STRING"
+                        + ") WITH ("
+                        + " 'bucket' = '-1',"
+                        + " 'global-index.row-count-per-shard' = '10000',"
+                        + " 'row-tracking.enabled' = 'true',"
+                        + " 'data-evolution.enabled' = 'true'"
+                        + ")");
+
+        // Insert 100000 records using BatchTableWrite for efficiency
+        FileStoreTable table = paimonTable("T");
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < 100000; i++) {
+                batchTableWrite.write(GenericRow.of(i, 
BinaryString.fromString("name_" + i)));
+            }
+            List<CommitMessage> commitMessages = 
batchTableWrite.prepareCommit();
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(commitMessages);
+            commit.close();
+        }
+
+        // Create bitmap index
+        tEnv.getConfig()
+                
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+        List<Row> createResult =
+                sql(
+                        "CALL sys.create_global_index(`table` => 'default.T', "
+                                + "`index_column` => 'name', "
+                                + "`index_type` => 'bitmap')");
+        assertThat(createResult).hasSize(1);
+        assertThat(createResult.get(0).getField(0))
+                .isEqualTo("bitmap global index created successfully for 
table: T");
+
+        // Verify index was created
+        table = paimonTable("T");
+        List<IndexManifestEntry> bitmapEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(entry -> 
entry.indexFile().indexType().equals("bitmap"))
+                        .collect(Collectors.toList());
+        assertThat(bitmapEntries).isNotEmpty();
+        long totalRowCount =
+                bitmapEntries.stream()
+                        .map(entry -> entry.indexFile().rowCount())
+                        .mapToLong(Long::longValue)
+                        .sum();
+        assertThat(totalRowCount).isEqualTo(100000L);
+
+        // Drop bitmap index
+        List<Row> dropResult =
+                sql(
+                        "CALL sys.drop_global_index(`table` => 'default.T', "
+                                + "`index_column` => 'name', "
+                                + "`index_type` => 'bitmap')");
+        assertThat(dropResult).hasSize(1);
+        assertThat(dropResult.get(0).getField(0))
+                .isInstanceOf(String.class)
+                .asString()
+                .contains("Dropped")
+                .contains("bitmap")
+                .contains("global index files")
+                .contains("name");
+
+        // Verify index was dropped
+        table = paimonTable("T");
+        bitmapEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(entry -> 
entry.indexFile().indexType().equals("bitmap"))
+                        .collect(Collectors.toList());
+        assertThat(bitmapEntries).isEmpty();
+    }
+
+    @Test
+    public void testDropBitmapGlobalIndexWithPartition() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " id INT,"
+                        + " name STRING,"
+                        + " pt STRING"
+                        + ") PARTITIONED BY (pt) WITH ("
+                        + " 'bucket' = '-1',"
+                        + " 'global-index.row-count-per-shard' = '10000',"
+                        + " 'row-tracking.enabled' = 'true',"
+                        + " 'data-evolution.enabled' = 'true'"
+                        + ")");
+
+        // Insert records into different partitions using BatchTableWrite for 
efficiency
+        FileStoreTable table = paimonTable("T");
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < 65000; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p0")));
+            }
+
+            for (int i = 0; i < 35000; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p1")));
+            }
+
+            for (int i = 0; i < 22222; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p0")));
+            }
+
+            for (int i = 0; i < 100; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p1")));
+            }
+
+            for (int i = 0; i < 100; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p2")));
+            }
+            for (int i = 0; i < 33333; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p2")));
+            }
+
+            for (int i = 0; i < 33333; i++) {
+                batchTableWrite.write(
+                        GenericRow.of(
+                                i,
+                                BinaryString.fromString("name_" + i),
+                                BinaryString.fromString("p1")));
+            }
+
+            List<CommitMessage> commitMessages = 
batchTableWrite.prepareCommit();
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(commitMessages);
+            commit.close();
+        }
+
+        // Create bitmap index
+        tEnv.getConfig()
+                
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+        List<Row> createResult =
+                sql(
+                        "CALL sys.create_global_index(`table` => 'default.T', "
+                                + "`index_column` => 'name', "
+                                + "`index_type` => 'bitmap')");
+        assertThat(createResult).hasSize(1);
+
+        // Verify index was created
+        table = paimonTable("T");
+        List<IndexManifestEntry> bitmapEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(entry -> 
entry.indexFile().indexType().equals("bitmap"))
+                        .collect(Collectors.toList());
+        assertThat(bitmapEntries).isNotEmpty();
+
+        // Verify total row count
+        long totalRowCount =
+                bitmapEntries.stream()
+                        .map(
+                                entry ->
+                                        
entry.indexFile().globalIndexMeta().rowRangeEnd()
+                                                - entry.indexFile()
+                                                        .globalIndexMeta()
+                                                        .rowRangeStart()
+                                                + 1)
+                        .mapToLong(Long::longValue)
+                        .sum();
+        assertThat(totalRowCount).isEqualTo(189088L);
+
+        // Drop bitmap index for partition p1 only
+        List<Row> dropResult =
+                sql(
+                        "CALL sys.drop_global_index(`table` => 'default.T', "
+                                + "`index_column` => 'name', "
+                                + "`index_type` => 'bitmap', "
+                                + "`partitions` => 'pt=p1')");
+        assertThat(dropResult).hasSize(1);
+        assertThat(dropResult.get(0).getField(0))
+                .isInstanceOf(String.class)
+                .asString()
+                .contains("Dropped")
+                .contains("bitmap");
+
+        // Verify only p1 index was dropped
+        bitmapEntries =
+                table.store().newIndexFileHandler().scanEntries().stream()
+                        .filter(entry -> 
entry.indexFile().indexType().equals("bitmap"))
+                        .collect(Collectors.toList());
+        assertThat(bitmapEntries).isNotEmpty();
+
+        // Verify remaining row count (p0: 87222 + p2: 33433 = 120655)
+        long remainingRowCount =
+                bitmapEntries.stream()
+                        .map(
+                                entry ->
+                                        
entry.indexFile().globalIndexMeta().rowRangeEnd()
+                                                - entry.indexFile()
+                                                        .globalIndexMeta()
+                                                        .rowRangeStart()
+                                                + 1)
+                        .mapToLong(Long::longValue)
+                        .sum();
+        assertThat(remainingRowCount).isEqualTo(120655L);
+    }
+
+    @Test
+    public void testDropNonExistentIndex() throws Exception {
+        sql(
+                "CREATE TABLE T ("
+                        + " id INT,"
+                        + " name STRING"
+                        + ") WITH ("
+                        + " 'bucket' = '-1',"
+                        + " 'row-tracking.enabled' = 'true',"
+                        + " 'data-evolution.enabled' = 'true'"
+                        + ")");
+
+        // Insert some data to create a snapshot
+        FileStoreTable table = paimonTable("T");
+        BatchWriteBuilder builder = table.newBatchWriteBuilder();
+        try (BatchTableWrite batchTableWrite = builder.newWrite()) {
+            for (int i = 0; i < 1000; i++) {
+                batchTableWrite.write(GenericRow.of(i, 
BinaryString.fromString("name_" + i)));
+            }
+            List<CommitMessage> commitMessages = 
batchTableWrite.prepareCommit();
+            BatchTableCommit commit = builder.newCommit();
+            commit.commit(commitMessages);
+            commit.close();
+        }
+
+        // Try to drop non-existent index
+        tEnv.getConfig()
+                
.set(org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC, true);
+        List<Row> dropResult =
+                sql(
+                        "CALL sys.drop_global_index(`table` => 'default.T', "
+                                + "`index_column` => 'name', "
+                                + "`index_type` => 'bitmap')");
+        assertThat(dropResult).hasSize(1);
+        assertThat(dropResult.get(0).getField(0))
+                .isInstanceOf(String.class)
+                .asString()
+                .contains("No bitmap global index found for column 'name'");
+    }
+}

Reply via email to