This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new df9a8e66d2a [Feature](iceberg) Implement rewrite_manifests procedure
for Iceberg (#59487)
df9a8e66d2a is described below
commit df9a8e66d2a124556a50485fd0148a0dcbc62c5e
Author: Lemon <[email protected]>
AuthorDate: Fri Jan 16 22:56:05 2026 +0800
[Feature](iceberg) Implement rewrite_manifests procedure for Iceberg
(#59487)
### What problem does this PR solve?
Issue Number: #58199
Problem Summary: This PR implements the rewrite_manifests procedure for
Iceberg tables in Apache Doris. The feature allows users to optimize
Iceberg table metadata by rewriting manifest files to improve query
performance and reduce metadata overhead. This addresses the need for
manifest file optimization in large Iceberg tables where numerous small
manifest files can impact query planning performance.
```
ALTER TABLE catalog.test_db.my_table EXECUTE rewrite_manifests();
+---------------------------+----------------------------+
| rewritten_manifests_count | total_data_manifests_count |
+---------------------------+----------------------------+
| 3 | 3 |
+---------------------------+----------------------------+
```
Co-authored-by: weiqiang <[email protected]>
---
.../action/IcebergExecuteActionFactory.java | 7 +-
.../action/IcebergRewriteManifestsAction.java | 109 ++++++
.../iceberg/rewrite/RewriteManifestExecutor.java | 125 +++++++
.../action/test_iceberg_rewrite_manifests.out | 44 +++
.../action/test_iceberg_rewrite_manifests.groovy | 364 +++++++++++++++++++++
5 files changed, 648 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 7c208cb7db6..0d09a9ef35c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
public static final String PUBLISH_CHANGES = "publish_changes";
+ public static final String REWRITE_MANIFESTS = "rewrite_manifests";
/**
* Create an Iceberg-specific ExecuteAction instance.
@@ -84,6 +85,9 @@ public class IcebergExecuteActionFactory {
case PUBLISH_CHANGES:
return new IcebergPublishChangesAction(properties,
partitionNamesInfo,
whereCondition);
+ case REWRITE_MANIFESTS:
+ return new IcebergRewriteManifestsAction(properties,
partitionNamesInfo,
+ whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " +
actionType
+ ". Supported procedures: " + String.join(", ",
getSupportedActions()));
@@ -104,7 +108,8 @@ public class IcebergExecuteActionFactory {
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
REWRITE_DATA_FILES,
- PUBLISH_CHANGES
+ PUBLISH_CHANGES,
+ REWRITE_MANIFESTS
};
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
new file mode 100644
index 00000000000..430e9fe9d5e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Action for rewriting Iceberg manifest files to optimize metadata layout
+ */
+public class IcebergRewriteManifestsAction extends BaseIcebergAction {
+ private static final Logger LOG =
LogManager.getLogger(IcebergRewriteManifestsAction.class);
+ public static final String SPEC_ID = "spec_id";
+
+ public IcebergRewriteManifestsAction(Map<String, String> properties,
+ Optional<PartitionNamesInfo> partitionNamesInfo,
+ Optional<Expression> whereCondition) {
+ super("rewrite_manifests", properties, partitionNamesInfo,
whereCondition);
+ }
+
+ @Override
+ protected void registerIcebergArguments() {
+ namedArguments.registerOptionalArgument(SPEC_ID,
+ "Spec id of the manifests to rewrite (defaults to current spec
id)",
+ null,
+ ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE));
+ }
+
+ @Override
+ protected void validateIcebergAction() throws UserException {
+ validateNoPartitions();
+ validateNoWhereCondition();
+ }
+
+ @Override
+ protected List<String> executeAction(TableIf table) throws UserException {
+ try {
+ Table icebergTable = ((IcebergExternalTable)
table).getIcebergTable();
+ Snapshot current = icebergTable.currentSnapshot();
+ if (current == null) {
+ // No current snapshot means the table is empty, no manifests
to rewrite
+ return Lists.newArrayList("0", "0");
+ }
+
+ // Get optional spec_id parameter
+ Integer specId = namedArguments.getInt(SPEC_ID);
+
+ // Execute rewrite operation
+ RewriteManifestExecutor executor = new RewriteManifestExecutor();
+ RewriteManifestExecutor.Result result = executor.execute(
+ icebergTable,
+ (ExternalTable) table,
+ specId);
+
+ return result.toStringList();
+ } catch (Exception e) {
+ LOG.warn("Failed to rewrite manifests for table: {}",
table.getName(), e);
+ throw new UserException("Rewrite manifests failed: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("rewritten_manifests_count", Type.INT, false,
+ "Number of manifests which were re-written by this
command"),
+ new Column("added_manifests_count", Type.INT, false,
+ "Number of new manifest files which were written by
this command")
+ );
+ }
+
+ @Override
+ public String getDescription() {
+ return "Rewrite Iceberg manifest files to optimize metadata layout";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
new file mode 100644
index 00000000000..f2e5ab77adb
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Executor for manifest rewrite operations
+ */
+public class RewriteManifestExecutor {
+ private static final Logger LOG =
LogManager.getLogger(RewriteManifestExecutor.class);
+
+ public static class Result {
+ private final int rewrittenCount;
+ private final int addedCount;
+
+ public Result(int rewrittenCount, int addedCount) {
+ this.rewrittenCount = rewrittenCount;
+ this.addedCount = addedCount;
+ }
+
+ public java.util.List<String> toStringList() {
+ return java.util.Arrays.asList(String.valueOf(rewrittenCount),
+ String.valueOf(addedCount));
+ }
+ }
+
+ /**
+ * Execute manifest rewrite using Iceberg RewriteManifests API
+ */
+ public Result execute(Table table, ExternalTable extTable, Integer specId)
throws UserException {
+ try {
+ // Get current snapshot and return early if table is empty
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot == null) {
+ return new Result(0, 0);
+ }
+
+ // Collect manifests before rewrite and filter by specId if
provided
+ List<ManifestFile> manifestsBefore =
currentSnapshot.dataManifests(table.io());
+ List<ManifestFile> manifestsBeforeTargeted =
filterBySpecId(manifestsBefore, specId);
+
+ int rewrittenCount = manifestsBeforeTargeted.size();
+
+ if (rewrittenCount == 0) {
+ return new Result(0, 0);
+ }
+
+ // Configure rewrite operation, optionally restricting manifests
by specId
+ RewriteManifests rm = table.rewriteManifests();
+
+ if (specId != null) {
+ final int targetSpecId = specId;
+ rm.rewriteIf(manifest -> manifest.partitionSpecId() ==
targetSpecId);
+ }
+
+ // Commit manifest rewrite
+ rm.commit();
+
+ // Refresh snapshot after rewrite
+ Snapshot snapshotAfter = table.currentSnapshot();
+ if (snapshotAfter == null) {
+ return new Result(rewrittenCount, 0);
+ }
+
+ // Collect manifests after rewrite and filter by specId
+ List<ManifestFile> manifestsAfter =
snapshotAfter.dataManifests(table.io());
+ List<ManifestFile> manifestsAfterTargeted =
filterBySpecId(manifestsAfter, specId);
+
+ // Compute addedCount as newly produced manifests (path not in
before set)
+ java.util.Set<String> beforePaths =
manifestsBeforeTargeted.stream()
+ .map(ManifestFile::path)
+ .collect(java.util.stream.Collectors.toSet());
+
+ int addedCount = (int) manifestsAfterTargeted.stream()
+ .map(ManifestFile::path)
+ .filter(path -> !beforePaths.contains(path))
+ .count();
+
+ // Invalidate table cache to ensure metadata is refreshed
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
+
+ return new Result(rewrittenCount, addedCount);
+ } catch (Exception e) {
+ LOG.warn("Failed to execute manifest rewrite for table: {}",
extTable.getName(), e);
+ throw new UserException("Failed to rewrite manifests: " +
e.getMessage(), e);
+ }
+ }
+
+ private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests,
Integer specId) {
+ if (specId == null) {
+ return manifests;
+ }
+ final int targetSpecId = specId;
+ return manifests.stream()
+ .filter(manifest -> manifest.partitionSpecId() == targetSpecId)
+ .collect(java.util.stream.Collectors.toList());
+ }
+}
diff --git
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
new file mode 100644
index 00000000000..0d68ae09dc0
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
@@ -0,0 +1,44 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !before_basic_rewrite --
+1 item1 electronics 100 2024-01-01
+2 item2 electronics 200 2024-01-02
+3 item3 books 300 2024-01-03
+4 item4 books 400 2024-01-04
+5 item5 clothing 500 2024-01-05
+6 item6 clothing 600 2024-01-06
+7 item7 electronics 700 2024-01-07
+8 item8 electronics 800 2024-01-08
+
+-- !after_basic_rewrite --
+1 item1 electronics 100 2024-01-01
+2 item2 electronics 200 2024-01-02
+3 item3 books 300 2024-01-03
+4 item4 books 400 2024-01-04
+5 item5 clothing 500 2024-01-05
+6 item6 clothing 600 2024-01-06
+7 item7 electronics 700 2024-01-07
+8 item8 electronics 800 2024-01-08
+
+-- !before_partitioned_rewrite --
+1 item1 100 2024 1
+2 item2 200 2024 1
+3 item3 300 2024 2
+4 item4 400 2024 2
+5 item5 500 2024 3
+6 item6 600 2024 3
+
+-- !after_partitioned_rewrite --
+1 item1 100 2024 1
+2 item2 200 2024 1
+3 item3 300 2024 2
+4 item4 400 2024 2
+5 item5 500 2024 3
+6 item6 600 2024 3
+
+-- !after_spec_id_rewrite --
+1 item1 100 2024 1 15
+2 item2 200 2024 1 16
+3 item3 300 2024 2 17
+4 item4 400 2024 3 18
+5 item5 500 2024 3 19
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
new file mode 100644
index 00000000000..de33deaeeef
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_manifests",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "test_iceberg_rewrite_manifests"
+ String db_name = "test_db"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """switch ${catalog_name}"""
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+ sql """use ${db_name}"""
+
+ //
=====================================================================================
+ // Test Case 1: Basic rewrite_manifests operation
+ // Tests the ability to rewrite multiple manifest files into fewer,
optimized files
+ //
=====================================================================================
+ logger.info("Starting basic rewrite_manifests test case")
+
+ def table_name = "test_rewrite_manifests_basic"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${table_name}"""
+
+ // Create a test table
+ sql """
+ CREATE TABLE ${db_name}.${table_name} (
+ id BIGINT,
+ name STRING,
+ category STRING,
+ value INT,
+ created_date DATE
+ ) ENGINE=iceberg
+ """
+ logger.info("Created test table: ${table_name}")
+
+ // Insert data in multiple single-row operations to create multiple
manifest files
+ // Each INSERT operation typically creates a new manifest file
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (1, 'item1',
'electronics', 100, '2024-01-01')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (2, 'item2',
'electronics', 200, '2024-01-02')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (3, 'item3', 'books',
300, '2024-01-03')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (4, 'item4', 'books',
400, '2024-01-04')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (5, 'item5',
'clothing', 500, '2024-01-05')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (6, 'item6',
'clothing', 600, '2024-01-06')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (7, 'item7',
'electronics', 700, '2024-01-07')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (8, 'item8',
'electronics', 800, '2024-01-08')"""
+
+ // Verify data before rewrite
+ qt_before_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+ // Check manifest count before rewrite
+ List<List<Object>> manifestsBefore = sql """
+ SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+ """
+ logger.info("Manifest count before rewrite: ${manifestsBefore}")
+
+ // Execute basic rewrite_manifests operation (no parameters - rewrite all
manifests)
+ List<List<Object>> rewriteResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Basic rewrite_manifests result: ${rewriteResult}")
+
+ // Verify the result structure
+ assertTrue(rewriteResult.size() == 1, "Expected exactly 1 result row")
+ assertTrue(rewriteResult[0].size() == 2, "Expected 2 columns in result")
+
+ // Extract rewritten and added manifest counts
+ int rewrittenCount = rewriteResult[0][0] as int
+ int addedCount = rewriteResult[0][1] as int
+
+ logger.info("Rewritten manifests: ${rewrittenCount}, Added manifests:
${addedCount}")
+ assertTrue(rewrittenCount > 0, "Should have rewritten at least 1 manifest")
+ assertTrue(addedCount >= 0, "Added count should be non-negative")
+ // Note: addedCount can be 0 if Iceberg determines manifests are already
optimal
+ // or if it reuses existing manifest files
+ if (addedCount > 0) {
+ assertTrue(addedCount <= rewrittenCount, "Added count should be <=
rewritten count (consolidation)")
+ }
+
+ // Verify data integrity after rewrite
+ qt_after_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+ // Check manifest count after rewrite (should be fewer or equal)
+ List<List<Object>> manifestsAfter = sql """
+ SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+ """
+ logger.info("Manifest count after rewrite: ${manifestsAfter}")
+ assertTrue(manifestsAfter[0][0] as int <= manifestsBefore[0][0] as int,
+ "Manifest count after rewrite should be <= count before")
+
+ //
=====================================================================================
+ // Test Case 2: rewrite_manifests on partitioned table
+ // Tests manifest rewriting on a table with partition specifications
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests on partitioned table test case")
+
+ def partitioned_table = "test_rewrite_manifests_partitioned"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${partitioned_table}"""
+
+ // Create a partitioned table
+ sql """
+ CREATE TABLE ${db_name}.${partitioned_table} (
+ id BIGINT,
+ name STRING,
+ value INT,
+ year INT,
+ month INT
+ ) ENGINE=iceberg
+ PARTITION BY (year, month)()
+ """
+ logger.info("Created partitioned test table: ${partitioned_table}")
+
+ // Insert data into different partitions to create multiple manifest files
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (1, 'item1',
100, 2024, 1)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (2, 'item2',
200, 2024, 1)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (3, 'item3',
300, 2024, 2)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (4, 'item4',
400, 2024, 2)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (5, 'item5',
500, 2024, 3)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (6, 'item6',
600, 2024, 3)"""
+
+ // Verify data before rewrite
+ qt_before_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER
BY id"""
+
+ // Check manifest count before rewrite
+ List<List<Object>> partitionedManifestsBefore = sql """
+ SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+ """
+ logger.info("Partitioned table manifest count before rewrite:
${partitionedManifestsBefore}")
+
+ // Execute rewrite_manifests on partitioned table
+ List<List<Object>> partitionedRewriteResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${partitioned_table}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Partitioned table rewrite_manifests result:
${partitionedRewriteResult}")
+
+ // Verify result structure
+ assertTrue(partitionedRewriteResult.size() == 1, "Expected exactly 1
result row for partitioned table")
+ assertTrue(partitionedRewriteResult[0].size() == 2, "Expected 2 columns in
result for partitioned table")
+
+ int partitionedRewrittenCount = partitionedRewriteResult[0][0] as int
+ int partitionedAddedCount = partitionedRewriteResult[0][1] as int
+
+ logger.info("Partitioned table - Rewritten manifests:
${partitionedRewrittenCount}, Added manifests: ${partitionedAddedCount}")
+ assertTrue(partitionedRewrittenCount > 0, "Partitioned table should have
rewritten at least 1 manifest")
+ assertTrue(partitionedAddedCount >= 0, "Partitioned table added count
should be non-negative")
+
+ // Verify data integrity after rewrite
+ qt_after_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER
BY id"""
+
+ // Check manifest count after rewrite
+ List<List<Object>> partitionedManifestsAfter = sql """
+ SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+ """
+ logger.info("Partitioned table manifest count after rewrite:
${partitionedManifestsAfter}")
+ assertTrue(partitionedManifestsAfter[0][0] as int <=
partitionedManifestsBefore[0][0] as int,
+ "Partitioned table manifest count after rewrite should be <= count
before")
+
+ //
=====================================================================================
+ // Test Case 3: rewrite_manifests with spec_id parameter
+ // Tests manifest rewriting for a specific partition spec
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests with spec_id test case")
+
+ def spec_id_table = "test_rewrite_manifests_spec_id"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${spec_id_table}"""
+
+ // Create a partitioned table (this will have spec_id = 0)
+ sql """
+ CREATE TABLE ${db_name}.${spec_id_table} (
+ id BIGINT,
+ name STRING,
+ value INT,
+ year INT,
+ month INT,
+ day INT
+ ) ENGINE=iceberg
+ PARTITION BY (year, month)()
+ """
+ logger.info("Created spec_id test table: ${spec_id_table}")
+
+ // Insert data to create manifests with spec_id 0
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (1, 'item1', 100,
2024, 1, 15)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (2, 'item2', 200,
2024, 1, 16)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (3, 'item3', 300,
2024, 2, 17)"""
+
+ // Check initial spec_id and manifest count
+ List<List<Object>> initialSpecs = sql """
+ SELECT partition_spec_id, COUNT(*) as manifest_count
+ FROM ${spec_id_table}\$manifests
+ GROUP BY partition_spec_id
+ ORDER BY partition_spec_id
+ """
+ logger.info("Initial spec IDs and manifest counts: ${initialSpecs}")
+
+ // Add day as a new partition field to create spec_id = 1
+ sql """ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} ADD
PARTITION KEY day"""
+
+ // Insert more data to create manifests with spec_id 1
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (4, 'item4', 400,
2024, 3, 18)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (5, 'item5', 500,
2024, 3, 19)"""
+
+ // Check spec_ids after adding new partition field
+ List<List<Object>> allSpecs = sql """
+ SELECT partition_spec_id, COUNT(*) as manifest_count
+ FROM ${spec_id_table}\$manifests
+ GROUP BY partition_spec_id
+ ORDER BY partition_spec_id
+ """
+ logger.info("All spec IDs and manifest counts: ${allSpecs}")
+
+ if (allSpecs.size() > 0) {
+ int targetSpec = allSpecs[0][0] as int
+ int targetCount = allSpecs[0][1] as int
+ List<List<Object>> specResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+ EXECUTE rewrite_manifests('spec_id' = '${targetSpec}')
+ """
+ int specRewritten = specResult[0][0] as int
+ assertTrue(specRewritten == targetCount,
+ "Should rewrite exactly ${targetCount} manifests for
spec_id=${targetSpec}, got ${specRewritten}")
+ qt_after_spec_id_rewrite """SELECT * FROM ${spec_id_table} ORDER BY
id"""
+ logger.info("spec_id filtering test completed successfully")
+ } else {
+ logger.warn("Could not create spec_id, skipping spec_id filtering
test")
+ }
+
+ //
=====================================================================================
+ // Test Case 4: rewrite_manifests on empty table (no current snapshot)
+ // Tests that rewrite_manifests handles tables with no current snapshot
gracefully
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests on empty table test case")
+
+ def empty_table = "test_empty_table"
+ sql """DROP TABLE IF EXISTS ${db_name}.${empty_table}"""
+ sql """
+ CREATE TABLE ${db_name}.${empty_table} (
+ id BIGINT,
+ name STRING
+ ) ENGINE=iceberg
+ """
+ logger.info("Created empty test table: ${empty_table}")
+
+ // Execute rewrite_manifests on empty table (no current snapshot)
+ List<List<Object>> emptyTableResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${empty_table}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Empty table rewrite_manifests result: ${emptyTableResult}")
+
+ // Verify result structure
+ assertTrue(emptyTableResult.size() == 1, "Expected exactly 1 result row
for empty table")
+ assertTrue(emptyTableResult[0].size() == 2, "Expected 2 columns in result
for empty table")
+
+ // Should return 0 rewritten manifests and 0 added manifests for empty
table
+ int emptyRewrittenCount = emptyTableResult[0][0] as int
+ int emptyAddedCount = emptyTableResult[0][1] as int
+
+ assertTrue(emptyRewrittenCount == 0, "Empty table should have 0 rewritten
manifests, got: ${emptyRewrittenCount}")
+ assertTrue(emptyAddedCount == 0, "Empty table should have 0 added
manifests, got: ${emptyAddedCount}")
+
+ logger.info("Empty table test completed: rewritten=${emptyRewrittenCount},
added=${emptyAddedCount}")
+
+ //
=====================================================================================
+ // Negative Test Cases: Parameter validation and error handling
+ //
=====================================================================================
+ logger.info("Starting negative test cases for rewrite_manifests")
+
+ // Test with invalid spec_id format
+
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests('spec_id' = 'not-a-number')
+ """
+ exception "Invalid"
+ }
+
+ // Test with non-existent spec_id (on spec_id table)
+
+ // Test with non-existent spec_id (very large number unlikely to exist) on
spec_id table
+ List<List<Object>> nonExistentSpecOnSpecTable = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+ EXECUTE rewrite_manifests('spec_id' = '99999')
+ """
+ assertTrue(nonExistentSpecOnSpecTable[0][0] as int == 0, "Non-existent
spec_id on spec_id_table should return 0 rewritten")
+ assertTrue(nonExistentSpecOnSpecTable[0][1] as int == 0, "Non-existent
spec_id on spec_id_table should return 0 added")
+
+ // Test with unknown parameter
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests('unknown-parameter' = 'value')
+ """
+ exception "Unknown argument: unknown-parameter"
+ }
+
+ // Test rewrite_manifests with partition specification (should fail)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests() PARTITIONS (part1)
+ """
+ exception "Action 'rewrite_manifests' does not support partition
specification"
+ }
+
+ // Test rewrite_manifests with WHERE condition (should fail)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests() WHERE id > 0
+ """
+ exception "Action 'rewrite_manifests' does not support WHERE condition"
+ }
+
+ // Test on non-existent table
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.non_existent_table
+ EXECUTE rewrite_manifests()
+ """
+ exception "Table non_existent_table does not exist"
+ }
+
+ logger.info("All rewrite_manifests test cases completed successfully")
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]