This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b291683c1b [flink] Rewrite_file_index action job add partitions option 
(#6133)
b291683c1b is described below

commit b291683c1b393e742246839176e699efab4bd095
Author: Tom <1547107...@qq.com>
AuthorDate: Mon Aug 25 09:58:08 2025 +0800

    [flink] Rewrite_file_index action job add partitions option (#6133)
---
 docs/content/flink/action-jars.md                  |  1 +
 .../flink/action/RewriteFileIndexAction.java       |  9 +-
 .../action/RewriteFileIndexActionFactory.java      |  9 +-
 .../flink/action/RewriteFileIndexActionITCase.java | 95 +++++++++++++++++++++-
 4 files changed, 106 insertions(+), 8 deletions(-)

diff --git a/docs/content/flink/action-jars.md 
b/docs/content/flink/action-jars.md
index 0f7e5bf420..8bc1452299 100644
--- a/docs/content/flink/action-jars.md
+++ b/docs/content/flink/action-jars.md
@@ -294,6 +294,7 @@ Run the following command to submit a 'rewrite_file_index' 
job for the table.
     rewrite_file_index \
     --warehouse <warehouse-path> \
     --identifier <database.table> \
+    [--partitions <partition_spec>] \
     [--catalog_conf <paimon-catalog-conf> [--catalog_conf 
<paimon-catalog-conf> ...]]
 ```
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
index dd2544093e..b4c9bf0d24 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java
@@ -27,16 +27,19 @@ import java.util.Map;
 /** Rewrite-file-index action for Flink. */
 public class RewriteFileIndexAction extends ActionBase {
 
-    private String identifier;
+    private final String identifier;
+    private final String partitions;
 
-    public RewriteFileIndexAction(String identifier, Map<String, String> 
catalogConfig) {
+    public RewriteFileIndexAction(
+            String identifier, String partitions, Map<String, String> 
catalogConfig) {
         super(catalogConfig);
         this.identifier = identifier;
+        this.partitions = partitions;
     }
 
     public void run() throws Exception {
         RewriteFileIndexProcedure rewriteFileIndexProcedure = new 
RewriteFileIndexProcedure();
         rewriteFileIndexProcedure.withCatalog(catalog);
-        rewriteFileIndexProcedure.call(new DefaultProcedureContext(env), 
identifier, "");
+        rewriteFileIndexProcedure.call(new DefaultProcedureContext(env), 
identifier, partitions);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
index 067dbbeccf..1a79f9d8c4 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java
@@ -28,6 +28,8 @@ public class RewriteFileIndexActionFactory implements 
ActionFactory {
 
     private static final String IDENTIFIER_KEY = "identifier";
 
+    private static final String PARTITIONS_KEY = "partitions";
+
     @Override
     public String identifier() {
         return IDENTIFIER;
@@ -37,8 +39,10 @@ public class RewriteFileIndexActionFactory implements 
ActionFactory {
     public Optional<Action> create(MultipleParameterToolAdapter params) {
         Map<String, String> catalogConfig = catalogConfigMap(params);
         String identifier = params.get(IDENTIFIER_KEY);
+        String partitions = params.get(PARTITIONS_KEY);
 
-        RewriteFileIndexAction action = new RewriteFileIndexAction(identifier, 
catalogConfig);
+        RewriteFileIndexAction action =
+                new RewriteFileIndexAction(identifier, partitions, 
catalogConfig);
 
         return Optional.of(action);
     }
@@ -52,6 +56,7 @@ public class RewriteFileIndexActionFactory implements 
ActionFactory {
         System.out.println(
                 "  rewrite_file_index \\\n"
                         + "--warehouse <warehouse_path> \\\n"
-                        + "--identifier <database.table>");
+                        + "--identifier <database.table> \\\n"
+                        + "[--partitions <partition_spec>]");
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
index dc6e5523b0..fbfd56b000 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java
@@ -65,14 +65,17 @@ public class RewriteFileIndexActionITCase extends 
ActionITCaseBase {
                         + " 'write-only' = 'true',"
                         + " 'bucket' = '-1'"
                         + ")");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         tEnv.executeSql(
                 "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 
16, '20221208'), (1, '100', 15, '20221209')");
-        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
         tEnv.executeSql("ALTER TABLE T SET 
('file-index.bloom-filter.columns'='k,v')");
 
         if (ThreadLocalRandom.current().nextBoolean()) {
             StreamExecutionEnvironment env =
-                    
streamExecutionEnvironmentBuilder().streamingMode().build();
+                    streamExecutionEnvironmentBuilder()
+                            .batchMode()
+                            .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                            .build();
             createAction(
                             RewriteFileIndexAction.class,
                             "rewrite_file_index",
@@ -83,11 +86,97 @@ public class RewriteFileIndexActionITCase extends 
ActionITCaseBase {
                     .withStreamExecutionEnvironment(env)
                     .run();
         } else {
-            executeSQL("CALL sys.rewrite_file_index('test_db.T')");
+            executeSQL("CALL sys.rewrite_file_index('test_db.T')", false, 
true);
         }
 
         FileStoreTable table = (FileStoreTable) catalog.getTable(new 
Identifier("test_db", "T"));
         List<ManifestEntry> list = table.store().newScan().plan().files();
+        testIndexFile(list, table);
+    }
+
+    @Test
+    public void testFileIndexAddIndexWithSpecifiedPartition() throws Exception 
{
+        TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build();
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG PAIMON WITH ('type'='paimon', 
'warehouse'='%s');",
+                        warehouse));
+        tEnv.useCatalog("PAIMON");
+
+        tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await();
+        tEnv.executeSql("USE test_db").await();
+
+        tEnv.executeSql(
+                "CREATE TABLE T ("
+                        + " k INT,"
+                        + " v STRING,"
+                        + " hh INT,"
+                        + " dt STRING"
+                        + ") PARTITIONED BY (dt, hh) WITH ("
+                        + " 'write-only' = 'true',"
+                        + " 'bucket' = '-1'"
+                        + ")");
+        tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true);
+        tEnv.executeSql(
+                "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 
16, '20221208'), (1, '100', 15, '20221209')");
+        tEnv.executeSql("ALTER TABLE T SET 
('file-index.bloom-filter.columns'='k,v')");
+
+        if (ThreadLocalRandom.current().nextBoolean()) {
+            StreamExecutionEnvironment env =
+                    streamExecutionEnvironmentBuilder()
+                            .batchMode()
+                            .setConf(TableConfigOptions.TABLE_DML_SYNC, true)
+                            .build();
+            createAction(
+                            RewriteFileIndexAction.class,
+                            "rewrite_file_index",
+                            "--warehouse",
+                            warehouse,
+                            "--identifier",
+                            "test_db.T",
+                            "--partitions",
+                            "dt=20221208")
+                    .withStreamExecutionEnvironment(env)
+                    .run();
+        } else {
+            executeSQL("CALL sys.rewrite_file_index('test_db.T', 
'dt=20221208')", false, true);
+        }
+
+        FileStoreTable table = (FileStoreTable) catalog.getTable(new 
Identifier("test_db", "T"));
+
+        List<ManifestEntry> partition20221209List =
+                table.store()
+                        .newScan()
+                        .withPartitionFilter(
+                                new 
PredicateBuilder(table.schema().logicalPartitionType())
+                                        .equal(0, 
BinaryString.fromString("20221209")))
+                        .plan()
+                        .files();
+
+        partition20221209List.forEach(
+                entry -> {
+                    List<String> extraFiles =
+                            entry.file().extraFiles().stream()
+                                    .filter(s -> 
s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX))
+                                    .collect(Collectors.toList());
+
+                    // Means no index file
+                    Assertions.assertThat(extraFiles.size()).isEqualTo(0);
+                });
+
+        List<ManifestEntry> partition20221208List =
+                table.store()
+                        .newScan()
+                        .withPartitionFilter(
+                                new 
PredicateBuilder(table.schema().logicalPartitionType())
+                                        .equal(0, 
BinaryString.fromString("20221208")))
+                        .plan()
+                        .files();
+
+        testIndexFile(partition20221208List, table);
+    }
+
+    private void testIndexFile(List<ManifestEntry> list, FileStoreTable table) 
throws Exception {
         for (ManifestEntry entry : list) {
             List<String> extraFiles =
                     entry.file().extraFiles().stream()

Reply via email to