This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push: new b291683c1b [flink] Rewrite_file_index action job add partitions option (#6133) b291683c1b is described below commit b291683c1b393e742246839176e699efab4bd095 Author: Tom <1547107...@qq.com> AuthorDate: Mon Aug 25 09:58:08 2025 +0800 [flink] Rewrite_file_index action job add partitions option (#6133) --- docs/content/flink/action-jars.md | 1 + .../flink/action/RewriteFileIndexAction.java | 9 +- .../action/RewriteFileIndexActionFactory.java | 9 +- .../flink/action/RewriteFileIndexActionITCase.java | 95 +++++++++++++++++++++- 4 files changed, 106 insertions(+), 8 deletions(-) diff --git a/docs/content/flink/action-jars.md b/docs/content/flink/action-jars.md index 0f7e5bf420..8bc1452299 100644 --- a/docs/content/flink/action-jars.md +++ b/docs/content/flink/action-jars.md @@ -294,6 +294,7 @@ Run the following command to submit a 'rewrite_file_index' job for the table. rewrite_file_index \ --warehouse <warehouse-path> \ --identifier <database.table> \ + [--partitions <partition_spec>] \ [--catalog_conf <paimon-catalog-conf> [--catalog_conf <paimon-catalog-conf> ...]] ``` diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java index dd2544093e..b4c9bf0d24 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexAction.java @@ -27,16 +27,19 @@ import java.util.Map; /** Rewrite-file-index action for Flink. */ public class RewriteFileIndexAction extends ActionBase { - private String identifier; + private final String identifier; + private final String partitions; - public RewriteFileIndexAction(String identifier, Map<String, String> catalogConfig) { + public RewriteFileIndexAction( + String identifier, String partitions, Map<String, String> catalogConfig) { super(catalogConfig); this.identifier = identifier; + this.partitions = partitions; } public void run() throws Exception { RewriteFileIndexProcedure rewriteFileIndexProcedure = new RewriteFileIndexProcedure(); rewriteFileIndexProcedure.withCatalog(catalog); - rewriteFileIndexProcedure.call(new DefaultProcedureContext(env), identifier, ""); + rewriteFileIndexProcedure.call(new DefaultProcedureContext(env), identifier, partitions); } } diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java index 067dbbeccf..1a79f9d8c4 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RewriteFileIndexActionFactory.java @@ -28,6 +28,8 @@ public class RewriteFileIndexActionFactory implements ActionFactory { private static final String IDENTIFIER_KEY = "identifier"; + private static final String PARTITIONS_KEY = "partitions"; + @Override public String identifier() { return IDENTIFIER; @@ -37,8 +39,10 @@ public class RewriteFileIndexActionFactory implements ActionFactory { public Optional<Action> create(MultipleParameterToolAdapter params) { Map<String, String> catalogConfig = catalogConfigMap(params); String identifier = params.get(IDENTIFIER_KEY); + String partitions = params.get(PARTITIONS_KEY); - RewriteFileIndexAction action = new RewriteFileIndexAction(identifier, catalogConfig); + RewriteFileIndexAction action = + new RewriteFileIndexAction(identifier, partitions, catalogConfig); return Optional.of(action); } @@ -52,6 +56,7 @@ public class RewriteFileIndexActionFactory implements ActionFactory { System.out.println( " rewrite_file_index \\\n" + "--warehouse <warehouse_path> \\\n" - + "--identifier <database.table>"); + + "--identifier <database.table> \\\n" + + "[--partitions <partition_spec>]"); } } diff --git a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java index dc6e5523b0..fbfd56b000 100644 --- a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java +++ b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/RewriteFileIndexActionITCase.java @@ -65,14 +65,17 @@ public class RewriteFileIndexActionITCase extends ActionITCaseBase { + " 'write-only' = 'true'," + " 'bucket' = '-1'" + ")"); + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); tEnv.executeSql( "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')"); - tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); tEnv.executeSql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')"); if (ThreadLocalRandom.current().nextBoolean()) { StreamExecutionEnvironment env = - streamExecutionEnvironmentBuilder().streamingMode().build(); + streamExecutionEnvironmentBuilder() + .batchMode() + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); createAction( RewriteFileIndexAction.class, "rewrite_file_index", @@ -83,11 +86,97 @@ public class RewriteFileIndexActionITCase extends ActionITCaseBase { .withStreamExecutionEnvironment(env) .run(); } else { - executeSQL("CALL sys.rewrite_file_index('test_db.T')"); + executeSQL("CALL sys.rewrite_file_index('test_db.T')", false, true); } FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier("test_db", "T")); List<ManifestEntry> list = table.store().newScan().plan().files(); + testIndexFile(list, table); + } + + @Test + public void testFileIndexAddIndexWithSpecifiedPartition() throws Exception { + TableEnvironment tEnv = tableEnvironmentBuilder().batchMode().build(); + tEnv.executeSql( + String.format( + "CREATE CATALOG PAIMON WITH ('type'='paimon', 'warehouse'='%s');", + warehouse)); + tEnv.useCatalog("PAIMON"); + + tEnv.executeSql("CREATE DATABASE IF NOT EXISTS test_db;").await(); + tEnv.executeSql("USE test_db").await(); + + tEnv.executeSql( + "CREATE TABLE T (" + + " k INT," + + " v STRING," + + " hh INT," + + " dt STRING" + + ") PARTITIONED BY (dt, hh) WITH (" + + " 'write-only' = 'true'," + + " 'bucket' = '-1'" + + ")"); + tEnv.getConfig().set(TableConfigOptions.TABLE_DML_SYNC, true); + tEnv.executeSql( + "INSERT INTO T VALUES (1, '100', 15, '20221208'), (1, '100', 16, '20221208'), (1, '100', 15, '20221209')"); + tEnv.executeSql("ALTER TABLE T SET ('file-index.bloom-filter.columns'='k,v')"); + + if (ThreadLocalRandom.current().nextBoolean()) { + StreamExecutionEnvironment env = + streamExecutionEnvironmentBuilder() + .batchMode() + .setConf(TableConfigOptions.TABLE_DML_SYNC, true) + .build(); + createAction( + RewriteFileIndexAction.class, + "rewrite_file_index", + "--warehouse", + warehouse, + "--identifier", + "test_db.T", + "--partitions", + "dt=20221208") + .withStreamExecutionEnvironment(env) + .run(); + } else { + executeSQL("CALL sys.rewrite_file_index('test_db.T', 'dt=20221208')", false, true); + } + + FileStoreTable table = (FileStoreTable) catalog.getTable(new Identifier("test_db", "T")); + + List<ManifestEntry> partition20221209List = + table.store() + .newScan() + .withPartitionFilter( + new PredicateBuilder(table.schema().logicalPartitionType()) + .equal(0, BinaryString.fromString("20221209"))) + .plan() + .files(); + + partition20221209List.forEach( + entry -> { + List<String> extraFiles = + entry.file().extraFiles().stream() + .filter(s -> s.endsWith(DataFilePathFactory.INDEX_PATH_SUFFIX)) + .collect(Collectors.toList()); + + // Means no index file + Assertions.assertThat(extraFiles.size()).isEqualTo(0); + }); + + List<ManifestEntry> partition20221208List = + table.store() + .newScan() + .withPartitionFilter( + new PredicateBuilder(table.schema().logicalPartitionType()) + .equal(0, BinaryString.fromString("20221208"))) + .plan() + .files(); + + testIndexFile(partition20221208List, table); + } + + private void testIndexFile(List<ManifestEntry> list, FileStoreTable table) throws Exception { for (ManifestEntry entry : list) { List<String> extraFiles = entry.file().extraFiles().stream()