This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new df9a8e66d2a [Feature](iceberg) Implement rewrite_manifests procedure 
for Iceberg (#59487)
df9a8e66d2a is described below

commit df9a8e66d2a124556a50485fd0148a0dcbc62c5e
Author: Lemon <[email protected]>
AuthorDate: Fri Jan 16 22:56:05 2026 +0800

    [Feature](iceberg) Implement rewrite_manifests procedure for Iceberg 
(#59487)
    
    ### What problem does this PR solve?
    
    Issue Number: #58199
    
    Problem Summary: This PR implements the rewrite_manifests procedure for
    Iceberg tables in Apache Doris. The feature allows users to optimize
    Iceberg table metadata by rewriting manifest files to improve query
    performance and reduce metadata overhead. This addresses the need for
    manifest file optimization in large Iceberg tables where numerous small
    manifest files can impact query planning performance.
    
    ```
    ALTER TABLE catalog.test_db.my_table EXECUTE rewrite_manifests();
    
    +---------------------------+----------------------------+
    | rewritten_manifests_count | total_data_manifests_count |
    +---------------------------+----------------------------+
    |                         3 |                          3 |
    +---------------------------+----------------------------+
    
    ```
    
    Co-authored-by: weiqiang <[email protected]>
---
 .../action/IcebergExecuteActionFactory.java        |   7 +-
 .../action/IcebergRewriteManifestsAction.java      | 109 ++++++
 .../iceberg/rewrite/RewriteManifestExecutor.java   | 125 +++++++
 .../action/test_iceberg_rewrite_manifests.out      |  44 +++
 .../action/test_iceberg_rewrite_manifests.groovy   | 364 +++++++++++++++++++++
 5 files changed, 648 insertions(+), 1 deletion(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 7c208cb7db6..0d09a9ef35c 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
     public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
     public static final String REWRITE_DATA_FILES = "rewrite_data_files";
     public static final String PUBLISH_CHANGES = "publish_changes";
+    public static final String REWRITE_MANIFESTS = "rewrite_manifests";
 
     /**
      * Create an Iceberg-specific ExecuteAction instance.
@@ -84,6 +85,9 @@ public class IcebergExecuteActionFactory {
             case PUBLISH_CHANGES:
                 return new IcebergPublishChangesAction(properties, 
partitionNamesInfo,
                         whereCondition);
+            case REWRITE_MANIFESTS:
+                return new IcebergRewriteManifestsAction(properties, 
partitionNamesInfo,
+                        whereCondition);
             default:
                 throw new DdlException("Unsupported Iceberg procedure: " + 
actionType
                         + ". Supported procedures: " + String.join(", ", 
getSupportedActions()));
@@ -104,7 +108,8 @@ public class IcebergExecuteActionFactory {
                 FAST_FORWARD,
                 EXPIRE_SNAPSHOTS,
                 REWRITE_DATA_FILES,
-                PUBLISH_CHANGES
+                PUBLISH_CHANGES,
+                REWRITE_MANIFESTS
         };
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
new file mode 100644
index 00000000000..430e9fe9d5e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Action for rewriting Iceberg manifest files to optimize metadata layout
+ */
+public class IcebergRewriteManifestsAction extends BaseIcebergAction {
+    private static final Logger LOG = 
LogManager.getLogger(IcebergRewriteManifestsAction.class);
+    public static final String SPEC_ID = "spec_id";
+
+    public IcebergRewriteManifestsAction(Map<String, String> properties,
+            Optional<PartitionNamesInfo> partitionNamesInfo,
+            Optional<Expression> whereCondition) {
+        super("rewrite_manifests", properties, partitionNamesInfo, 
whereCondition);
+    }
+
+    @Override
+    protected void registerIcebergArguments() {
+        namedArguments.registerOptionalArgument(SPEC_ID,
+                "Spec id of the manifests to rewrite (defaults to current spec 
id)",
+                null,
+                ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE));
+    }
+
+    @Override
+    protected void validateIcebergAction() throws UserException {
+        validateNoPartitions();
+        validateNoWhereCondition();
+    }
+
+    @Override
+    protected List<String> executeAction(TableIf table) throws UserException {
+        try {
+            Table icebergTable = ((IcebergExternalTable) 
table).getIcebergTable();
+            Snapshot current = icebergTable.currentSnapshot();
+            if (current == null) {
+                // No current snapshot means the table is empty, no manifests 
to rewrite
+                return Lists.newArrayList("0", "0");
+            }
+
+            // Get optional spec_id parameter
+            Integer specId = namedArguments.getInt(SPEC_ID);
+
+            // Execute rewrite operation
+            RewriteManifestExecutor executor = new RewriteManifestExecutor();
+            RewriteManifestExecutor.Result result = executor.execute(
+                    icebergTable,
+                    (ExternalTable) table,
+                    specId);
+
+            return result.toStringList();
+        } catch (Exception e) {
+            LOG.warn("Failed to rewrite manifests for table: {}", 
table.getName(), e);
+            throw new UserException("Rewrite manifests failed: " + 
e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("rewritten_manifests_count", Type.INT, false,
+                        "Number of manifests which were re-written by this 
command"),
+                new Column("added_manifests_count", Type.INT, false,
+                        "Number of new manifest files which were written by 
this command")
+        );
+    }
+
+    @Override
+    public String getDescription() {
+        return "Rewrite Iceberg manifest files to optimize metadata layout";
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
new file mode 100644
index 00000000000..f2e5ab77adb
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Executor for manifest rewrite operations
+ */
+public class RewriteManifestExecutor {
+    private static final Logger LOG = 
LogManager.getLogger(RewriteManifestExecutor.class);
+
+    public static class Result {
+        private final int rewrittenCount;
+        private final int addedCount;
+
+        public Result(int rewrittenCount, int addedCount) {
+            this.rewrittenCount = rewrittenCount;
+            this.addedCount = addedCount;
+        }
+
+        public java.util.List<String> toStringList() {
+            return java.util.Arrays.asList(String.valueOf(rewrittenCount),
+                    String.valueOf(addedCount));
+        }
+    }
+
+    /**
+     * Execute manifest rewrite using Iceberg RewriteManifests API
+     */
+    public Result execute(Table table, ExternalTable extTable, Integer specId) 
throws UserException {
+        try {
+            // Get current snapshot and return early if table is empty
+            Snapshot currentSnapshot = table.currentSnapshot();
+            if (currentSnapshot == null) {
+                return new Result(0, 0);
+            }
+
+            // Collect manifests before rewrite and filter by specId if 
provided
+            List<ManifestFile> manifestsBefore = 
currentSnapshot.dataManifests(table.io());
+            List<ManifestFile> manifestsBeforeTargeted = 
filterBySpecId(manifestsBefore, specId);
+
+            int rewrittenCount = manifestsBeforeTargeted.size();
+
+            if (rewrittenCount == 0) {
+                return new Result(0, 0);
+            }
+
+            // Configure rewrite operation, optionally restricting manifests 
by specId
+            RewriteManifests rm = table.rewriteManifests();
+
+            if (specId != null) {
+                final int targetSpecId = specId;
+                rm.rewriteIf(manifest -> manifest.partitionSpecId() == 
targetSpecId);
+            }
+
+            // Commit manifest rewrite
+            rm.commit();
+
+            // Refresh snapshot after rewrite
+            Snapshot snapshotAfter = table.currentSnapshot();
+            if (snapshotAfter == null) {
+                return new Result(rewrittenCount, 0);
+            }
+
+            // Collect manifests after rewrite and filter by specId
+            List<ManifestFile> manifestsAfter = 
snapshotAfter.dataManifests(table.io());
+            List<ManifestFile> manifestsAfterTargeted = 
filterBySpecId(manifestsAfter, specId);
+
+            // Compute addedCount as newly produced manifests (path not in 
before set)
+            java.util.Set<String> beforePaths = 
manifestsBeforeTargeted.stream()
+                    .map(ManifestFile::path)
+                    .collect(java.util.stream.Collectors.toSet());
+
+            int addedCount = (int) manifestsAfterTargeted.stream()
+                    .map(ManifestFile::path)
+                    .filter(path -> !beforePaths.contains(path))
+                    .count();
+
+            // Invalidate table cache to ensure metadata is refreshed
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
+
+            return new Result(rewrittenCount, addedCount);
+        } catch (Exception e) {
+            LOG.warn("Failed to execute manifest rewrite for table: {}", 
extTable.getName(), e);
+            throw new UserException("Failed to rewrite manifests: " + 
e.getMessage(), e);
+        }
+    }
+
+    private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests, 
Integer specId) {
+        if (specId == null) {
+            return manifests;
+        }
+        final int targetSpecId = specId;
+        return manifests.stream()
+                .filter(manifest -> manifest.partitionSpecId() == targetSpecId)
+                .collect(java.util.stream.Collectors.toList());
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
new file mode 100644
index 00000000000..0d68ae09dc0
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !before_basic_rewrite --
+1      item1   electronics     100     2024-01-01
+2      item2   electronics     200     2024-01-02
+3      item3   books   300     2024-01-03
+4      item4   books   400     2024-01-04
+5      item5   clothing        500     2024-01-05
+6      item6   clothing        600     2024-01-06
+7      item7   electronics     700     2024-01-07
+8      item8   electronics     800     2024-01-08
+
+-- !after_basic_rewrite --
+1      item1   electronics     100     2024-01-01
+2      item2   electronics     200     2024-01-02
+3      item3   books   300     2024-01-03
+4      item4   books   400     2024-01-04
+5      item5   clothing        500     2024-01-05
+6      item6   clothing        600     2024-01-06
+7      item7   electronics     700     2024-01-07
+8      item8   electronics     800     2024-01-08
+
+-- !before_partitioned_rewrite --
+1      item1   100     2024    1
+2      item2   200     2024    1
+3      item3   300     2024    2
+4      item4   400     2024    2
+5      item5   500     2024    3
+6      item6   600     2024    3
+
+-- !after_partitioned_rewrite --
+1      item1   100     2024    1
+2      item2   200     2024    1
+3      item3   300     2024    2
+4      item4   400     2024    2
+5      item5   500     2024    3
+6      item6   600     2024    3
+
+-- !after_spec_id_rewrite --
+1      item1   100     2024    1       15
+2      item2   200     2024    1       16
+3      item3   300     2024    2       17
+4      item4   400     2024    3       18
+5      item5   500     2024    3       19
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
new file mode 100644
index 00000000000..de33deaeeef
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_manifests", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String catalog_name = "test_iceberg_rewrite_manifests"
+    String db_name = "test_db"
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name}"""
+    sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+    sql """use ${db_name}"""
+
+    // 
=====================================================================================
+    // Test Case 1: Basic rewrite_manifests operation
+    // Tests the ability to rewrite multiple manifest files into fewer, 
optimized files
+    // 
=====================================================================================
+    logger.info("Starting basic rewrite_manifests test case")
+    
+    def table_name = "test_rewrite_manifests_basic"
+    
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${table_name}"""
+    
+    // Create a test table
+    sql """
+        CREATE TABLE ${db_name}.${table_name} (
+            id BIGINT,
+            name STRING,
+            category STRING,
+            value INT,
+            created_date DATE
+        ) ENGINE=iceberg
+    """
+    logger.info("Created test table: ${table_name}")
+    
+    // Insert data in multiple single-row operations to create multiple 
manifest files
+    // Each INSERT operation typically creates a new manifest file
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (1, 'item1', 
'electronics', 100, '2024-01-01')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (2, 'item2', 
'electronics', 200, '2024-01-02')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (3, 'item3', 'books', 
300, '2024-01-03')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (4, 'item4', 'books', 
400, '2024-01-04')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (5, 'item5', 
'clothing', 500, '2024-01-05')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (6, 'item6', 
'clothing', 600, '2024-01-06')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (7, 'item7', 
'electronics', 700, '2024-01-07')"""
+    sql """INSERT INTO ${db_name}.${table_name} VALUES (8, 'item8', 
'electronics', 800, '2024-01-08')"""
+    
+    // Verify data before rewrite
+    qt_before_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+    
+    // Check manifest count before rewrite
+    List<List<Object>> manifestsBefore = sql """
+        SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+    """
+    logger.info("Manifest count before rewrite: ${manifestsBefore}")
+    
+    // Execute basic rewrite_manifests operation (no parameters - rewrite all 
manifests)
+    List<List<Object>> rewriteResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Basic rewrite_manifests result: ${rewriteResult}")
+    
+    // Verify the result structure
+    assertTrue(rewriteResult.size() == 1, "Expected exactly 1 result row")
+    assertTrue(rewriteResult[0].size() == 2, "Expected 2 columns in result")
+    
+    // Extract rewritten and added manifest counts
+    int rewrittenCount = rewriteResult[0][0] as int
+    int addedCount = rewriteResult[0][1] as int
+    
+    logger.info("Rewritten manifests: ${rewrittenCount}, Added manifests: 
${addedCount}")
+    assertTrue(rewrittenCount > 0, "Should have rewritten at least 1 manifest")
+    assertTrue(addedCount >= 0, "Added count should be non-negative")
+    // Note: addedCount can be 0 if Iceberg determines manifests are already 
optimal
+    // or if it reuses existing manifest files
+    if (addedCount > 0) {
+        assertTrue(addedCount <= rewrittenCount, "Added count should be <= 
rewritten count (consolidation)")
+    }
+    
+    // Verify data integrity after rewrite
+    qt_after_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+    
+    // Check manifest count after rewrite (should be fewer or equal)
+    List<List<Object>> manifestsAfter = sql """
+        SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+    """
+    logger.info("Manifest count after rewrite: ${manifestsAfter}")
+    assertTrue(manifestsAfter[0][0] as int <= manifestsBefore[0][0] as int, 
+        "Manifest count after rewrite should be <= count before")
+
+    // 
=====================================================================================
+    // Test Case 2: rewrite_manifests on partitioned table
+    // Tests manifest rewriting on a table with partition specifications
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests on partitioned table test case")
+    
+    def partitioned_table = "test_rewrite_manifests_partitioned"
+    
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${partitioned_table}"""
+    
+    // Create a partitioned table
+    sql """
+        CREATE TABLE ${db_name}.${partitioned_table} (
+            id BIGINT,
+            name STRING,
+            value INT,
+            year INT,
+            month INT
+        ) ENGINE=iceberg
+        PARTITION BY (year, month)()
+    """
+    logger.info("Created partitioned test table: ${partitioned_table}")
+    
+    // Insert data into different partitions to create multiple manifest files
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (1, 'item1', 
100, 2024, 1)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (2, 'item2', 
200, 2024, 1)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (3, 'item3', 
300, 2024, 2)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (4, 'item4', 
400, 2024, 2)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (5, 'item5', 
500, 2024, 3)"""
+    sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (6, 'item6', 
600, 2024, 3)"""
+    
+    // Verify data before rewrite
+    qt_before_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER 
BY id"""
+    
+    // Check manifest count before rewrite
+    List<List<Object>> partitionedManifestsBefore = sql """
+        SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+    """
+    logger.info("Partitioned table manifest count before rewrite: 
${partitionedManifestsBefore}")
+    
+    // Execute rewrite_manifests on partitioned table
+    List<List<Object>> partitionedRewriteResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${partitioned_table}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Partitioned table rewrite_manifests result: 
${partitionedRewriteResult}")
+    
+    // Verify result structure
+    assertTrue(partitionedRewriteResult.size() == 1, "Expected exactly 1 
result row for partitioned table")
+    assertTrue(partitionedRewriteResult[0].size() == 2, "Expected 2 columns in 
result for partitioned table")
+    
+    int partitionedRewrittenCount = partitionedRewriteResult[0][0] as int
+    int partitionedAddedCount = partitionedRewriteResult[0][1] as int
+    
+    logger.info("Partitioned table - Rewritten manifests: 
${partitionedRewrittenCount}, Added manifests: ${partitionedAddedCount}")
+    assertTrue(partitionedRewrittenCount > 0, "Partitioned table should have 
rewritten at least 1 manifest")
+    assertTrue(partitionedAddedCount >= 0, "Partitioned table added count 
should be non-negative")
+    
+    // Verify data integrity after rewrite
+    qt_after_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER 
BY id"""
+    
+    // Check manifest count after rewrite
+    List<List<Object>> partitionedManifestsAfter = sql """
+        SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+    """
+    logger.info("Partitioned table manifest count after rewrite: 
${partitionedManifestsAfter}")
+    assertTrue(partitionedManifestsAfter[0][0] as int <= 
partitionedManifestsBefore[0][0] as int,
+        "Partitioned table manifest count after rewrite should be <= count 
before")
+
+    // 
=====================================================================================
+    // Test Case 3: rewrite_manifests with spec_id parameter
+    // Tests manifest rewriting for a specific partition spec
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests with spec_id test case")
+    
+    def spec_id_table = "test_rewrite_manifests_spec_id"
+    
+    // Clean up if table exists
+    sql """DROP TABLE IF EXISTS ${db_name}.${spec_id_table}"""
+    
+    // Create a partitioned table (this will have spec_id = 0)
+    sql """
+        CREATE TABLE ${db_name}.${spec_id_table} (
+            id BIGINT,
+            name STRING,
+            value INT,
+            year INT,
+            month INT,
+            day INT
+        ) ENGINE=iceberg
+        PARTITION BY (year, month)()
+    """
+    logger.info("Created spec_id test table: ${spec_id_table}")
+    
+    // Insert data to create manifests with spec_id 0
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (1, 'item1', 100, 
2024, 1, 15)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (2, 'item2', 200, 
2024, 1, 16)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (3, 'item3', 300, 
2024, 2, 17)"""
+    
+    // Check initial spec_id and manifest count
+    List<List<Object>> initialSpecs = sql """
+        SELECT partition_spec_id, COUNT(*) as manifest_count 
+        FROM ${spec_id_table}\$manifests 
+        GROUP BY partition_spec_id 
+        ORDER BY partition_spec_id
+    """
+    logger.info("Initial spec IDs and manifest counts: ${initialSpecs}")
+    
+    // Add day as a new partition field to create spec_id = 1
+    sql """ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} ADD 
PARTITION KEY day"""
+    
+    // Insert more data to create manifests with spec_id 1
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (4, 'item4', 400, 
2024, 3, 18)"""
+    sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (5, 'item5', 500, 
2024, 3, 19)"""
+    
+    // Check spec_ids after adding new partition field
+    List<List<Object>> allSpecs = sql """
+        SELECT partition_spec_id, COUNT(*) as manifest_count 
+        FROM ${spec_id_table}\$manifests 
+        GROUP BY partition_spec_id 
+        ORDER BY partition_spec_id
+    """
+    logger.info("All spec IDs and manifest counts: ${allSpecs}")
+    
+    if (allSpecs.size() > 0) {
+        int targetSpec = allSpecs[0][0] as int
+        int targetCount = allSpecs[0][1] as int
+        List<List<Object>> specResult = sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+            EXECUTE rewrite_manifests('spec_id' = '${targetSpec}')
+        """
+        int specRewritten = specResult[0][0] as int
+        assertTrue(specRewritten == targetCount,
+            "Should rewrite exactly ${targetCount} manifests for 
spec_id=${targetSpec}, got ${specRewritten}")
+        qt_after_spec_id_rewrite """SELECT * FROM ${spec_id_table} ORDER BY 
id"""
+        logger.info("spec_id filtering test completed successfully")
+    } else {
+        logger.warn("Could not create spec_id, skipping spec_id filtering 
test")
+    }
+
+    // 
=====================================================================================
+    // Test Case 4: rewrite_manifests on empty table (no current snapshot)
+    // Tests that rewrite_manifests handles tables with no current snapshot 
gracefully
+    // 
=====================================================================================
+    logger.info("Starting rewrite_manifests on empty table test case")
+    
+    def empty_table = "test_empty_table"
+    sql """DROP TABLE IF EXISTS ${db_name}.${empty_table}"""
+    sql """
+        CREATE TABLE ${db_name}.${empty_table} (
+            id BIGINT,
+            name STRING
+        ) ENGINE=iceberg
+    """
+    logger.info("Created empty test table: ${empty_table}")
+    
+    // Execute rewrite_manifests on empty table (no current snapshot)
+    List<List<Object>> emptyTableResult = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${empty_table}
+        EXECUTE rewrite_manifests()
+    """
+    logger.info("Empty table rewrite_manifests result: ${emptyTableResult}")
+    
+    // Verify result structure
+    assertTrue(emptyTableResult.size() == 1, "Expected exactly 1 result row 
for empty table")
+    assertTrue(emptyTableResult[0].size() == 2, "Expected 2 columns in result 
for empty table")
+    
+    // Should return 0 rewritten manifests and 0 added manifests for empty 
table
+    int emptyRewrittenCount = emptyTableResult[0][0] as int
+    int emptyAddedCount = emptyTableResult[0][1] as int
+    
+    assertTrue(emptyRewrittenCount == 0, "Empty table should have 0 rewritten 
manifests, got: ${emptyRewrittenCount}")
+    assertTrue(emptyAddedCount == 0, "Empty table should have 0 added 
manifests, got: ${emptyAddedCount}")
+    
+    logger.info("Empty table test completed: rewritten=${emptyRewrittenCount}, 
added=${emptyAddedCount}")
+
+    // 
=====================================================================================
+    // Negative Test Cases: Parameter validation and error handling
+    // 
=====================================================================================
+    logger.info("Starting negative test cases for rewrite_manifests")
+
+    // Test with invalid spec_id format
+
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests('spec_id' = 'not-a-number')
+        """
+        exception "Invalid"
+    }
+
+    // Test with non-existent spec_id (on spec_id table)
+
+    // Test with non-existent spec_id (very large number unlikely to exist) on 
spec_id table
+    List<List<Object>> nonExistentSpecOnSpecTable = sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+        EXECUTE rewrite_manifests('spec_id' = '99999')
+    """
+    assertTrue(nonExistentSpecOnSpecTable[0][0] as int == 0, "Non-existent 
spec_id on spec_id_table should return 0 rewritten")
+    assertTrue(nonExistentSpecOnSpecTable[0][1] as int == 0, "Non-existent 
spec_id on spec_id_table should return 0 added")
+
+    // Test with unknown parameter
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests('unknown-parameter' = 'value')
+        """
+        exception "Unknown argument: unknown-parameter"
+    }
+
+    // Test rewrite_manifests with partition specification (should fail)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests() PARTITIONS (part1)
+        """
+        exception "Action 'rewrite_manifests' does not support partition 
specification"
+    }
+
+    // Test rewrite_manifests with WHERE condition (should fail)
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+            EXECUTE rewrite_manifests() WHERE id > 0
+        """
+        exception "Action 'rewrite_manifests' does not support WHERE condition"
+    }
+
+    // Test on non-existent table
+    test {
+        sql """
+            ALTER TABLE ${catalog_name}.${db_name}.non_existent_table
+            EXECUTE rewrite_manifests()
+        """
+        exception "Table non_existent_table does not exist"
+    }
+
+    logger.info("All rewrite_manifests test cases completed successfully")
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to