This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 94f4c3db5ed branch-4.1: [Feature](iceberg) Implement rewrite_manifests
procedure for Iceberg #59487 (#64391)
94f4c3db5ed is described below
commit 94f4c3db5ed38d6352de7a726504eb06a4811a5e
Author: Socrates <[email protected]>
AuthorDate: Tue Jun 16 19:25:20 2026 +0800
branch-4.1: [Feature](iceberg) Implement rewrite_manifests procedure for
Iceberg #59487 (#64391)
### What problem does this PR solve?
Pick #59487 to branch-4.1.
Original PR: https://github.com/apache/doris/pull/59487
This pick implements the Iceberg rewrite_manifests procedure on
branch-4.1 and includes the corresponding external Iceberg regression
test.
### Release note
Feature Implement rewrite_manifests procedure for Iceberg tables.
### Check List (For Author)
- Test: Unit Test
- ./run-fe-ut.sh --run
org.apache.doris.datasource.iceberg.IcebergUtilsTest
- Test: Static check
- git diff --check origin/branch-4.1...HEAD
- Behavior changed: Yes. Iceberg tables support ALTER TABLE ... EXECUTE
rewrite_manifests().
- Does this need documentation: No
### Pick Info
- Source PR: #59487
- Source merge commit: df9a8e66d2a124556a50485fd0148a0dcbc62c5e
- Pick commit: f3f2ceeb2d7
Co-authored-by: Lemon <[email protected]>
Co-authored-by: weiqiang <[email protected]>
---
.../action/IcebergExecuteActionFactory.java | 7 +-
.../action/IcebergRewriteManifestsAction.java | 109 ++++++
.../iceberg/rewrite/RewriteManifestExecutor.java | 125 +++++++
.../action/test_iceberg_rewrite_manifests.out | 43 +++
.../action/test_iceberg_rewrite_manifests.groovy | 364 +++++++++++++++++++++
5 files changed, 647 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 7c208cb7db6..0d09a9ef35c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -40,6 +40,7 @@ public class IcebergExecuteActionFactory {
public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
public static final String REWRITE_DATA_FILES = "rewrite_data_files";
public static final String PUBLISH_CHANGES = "publish_changes";
+ public static final String REWRITE_MANIFESTS = "rewrite_manifests";
/**
* Create an Iceberg-specific ExecuteAction instance.
@@ -84,6 +85,9 @@ public class IcebergExecuteActionFactory {
case PUBLISH_CHANGES:
return new IcebergPublishChangesAction(properties,
partitionNamesInfo,
whereCondition);
+ case REWRITE_MANIFESTS:
+ return new IcebergRewriteManifestsAction(properties,
partitionNamesInfo,
+ whereCondition);
default:
throw new DdlException("Unsupported Iceberg procedure: " +
actionType
+ ". Supported procedures: " + String.join(", ",
getSupportedActions()));
@@ -104,7 +108,8 @@ public class IcebergExecuteActionFactory {
FAST_FORWARD,
EXPIRE_SNAPSHOTS,
REWRITE_DATA_FILES,
- PUBLISH_CHANGES
+ PUBLISH_CHANGES,
+ REWRITE_MANIFESTS
};
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
new file mode 100644
index 00000000000..430e9fe9d5e
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergRewriteManifestsAction.java
@@ -0,0 +1,109 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.datasource.iceberg.rewrite.RewriteManifestExecutor;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Action for rewriting Iceberg manifest files to optimize metadata layout
+ */
+public class IcebergRewriteManifestsAction extends BaseIcebergAction {
+ private static final Logger LOG =
LogManager.getLogger(IcebergRewriteManifestsAction.class);
+ public static final String SPEC_ID = "spec_id";
+
+ public IcebergRewriteManifestsAction(Map<String, String> properties,
+ Optional<PartitionNamesInfo> partitionNamesInfo,
+ Optional<Expression> whereCondition) {
+ super("rewrite_manifests", properties, partitionNamesInfo,
whereCondition);
+ }
+
+ @Override
+ protected void registerIcebergArguments() {
+ namedArguments.registerOptionalArgument(SPEC_ID,
+ "Spec id of the manifests to rewrite (defaults to current spec
id)",
+ null,
+ ArgumentParsers.intRange(SPEC_ID, 0, Integer.MAX_VALUE));
+ }
+
+ @Override
+ protected void validateIcebergAction() throws UserException {
+ validateNoPartitions();
+ validateNoWhereCondition();
+ }
+
+ @Override
+ protected List<String> executeAction(TableIf table) throws UserException {
+ try {
+ Table icebergTable = ((IcebergExternalTable)
table).getIcebergTable();
+ Snapshot current = icebergTable.currentSnapshot();
+ if (current == null) {
+ // No current snapshot means the table is empty, no manifests
to rewrite
+ return Lists.newArrayList("0", "0");
+ }
+
+ // Get optional spec_id parameter
+ Integer specId = namedArguments.getInt(SPEC_ID);
+
+ // Execute rewrite operation
+ RewriteManifestExecutor executor = new RewriteManifestExecutor();
+ RewriteManifestExecutor.Result result = executor.execute(
+ icebergTable,
+ (ExternalTable) table,
+ specId);
+
+ return result.toStringList();
+ } catch (Exception e) {
+ LOG.warn("Failed to rewrite manifests for table: {}",
table.getName(), e);
+ throw new UserException("Rewrite manifests failed: " +
e.getMessage(), e);
+ }
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("rewritten_manifests_count", Type.INT, false,
+ "Number of manifests which were re-written by this
command"),
+ new Column("added_manifests_count", Type.INT, false,
+ "Number of new manifest files which were written by
this command")
+ );
+ }
+
+ @Override
+ public String getDescription() {
+ return "Rewrite Iceberg manifest files to optimize metadata layout";
+ }
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
new file mode 100644
index 00000000000..f2e5ab77adb
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/rewrite/RewriteManifestExecutor.java
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.rewrite;
+
+import org.apache.doris.catalog.Env;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.RewriteManifests;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.List;
+
+/**
+ * Executor for manifest rewrite operations
+ */
+public class RewriteManifestExecutor {
+ private static final Logger LOG =
LogManager.getLogger(RewriteManifestExecutor.class);
+
+ public static class Result {
+ private final int rewrittenCount;
+ private final int addedCount;
+
+ public Result(int rewrittenCount, int addedCount) {
+ this.rewrittenCount = rewrittenCount;
+ this.addedCount = addedCount;
+ }
+
+ public java.util.List<String> toStringList() {
+ return java.util.Arrays.asList(String.valueOf(rewrittenCount),
+ String.valueOf(addedCount));
+ }
+ }
+
+ /**
+ * Execute manifest rewrite using Iceberg RewriteManifests API
+ */
+ public Result execute(Table table, ExternalTable extTable, Integer specId)
throws UserException {
+ try {
+ // Get current snapshot and return early if table is empty
+ Snapshot currentSnapshot = table.currentSnapshot();
+ if (currentSnapshot == null) {
+ return new Result(0, 0);
+ }
+
+ // Collect manifests before rewrite and filter by specId if
provided
+ List<ManifestFile> manifestsBefore =
currentSnapshot.dataManifests(table.io());
+ List<ManifestFile> manifestsBeforeTargeted =
filterBySpecId(manifestsBefore, specId);
+
+ int rewrittenCount = manifestsBeforeTargeted.size();
+
+ if (rewrittenCount == 0) {
+ return new Result(0, 0);
+ }
+
+ // Configure rewrite operation, optionally restricting manifests
by specId
+ RewriteManifests rm = table.rewriteManifests();
+
+ if (specId != null) {
+ final int targetSpecId = specId;
+ rm.rewriteIf(manifest -> manifest.partitionSpecId() ==
targetSpecId);
+ }
+
+ // Commit manifest rewrite
+ rm.commit();
+
+ // Refresh snapshot after rewrite
+ Snapshot snapshotAfter = table.currentSnapshot();
+ if (snapshotAfter == null) {
+ return new Result(rewrittenCount, 0);
+ }
+
+ // Collect manifests after rewrite and filter by specId
+ List<ManifestFile> manifestsAfter =
snapshotAfter.dataManifests(table.io());
+ List<ManifestFile> manifestsAfterTargeted =
filterBySpecId(manifestsAfter, specId);
+
+ // Compute addedCount as newly produced manifests (path not in
before set)
+ java.util.Set<String> beforePaths =
manifestsBeforeTargeted.stream()
+ .map(ManifestFile::path)
+ .collect(java.util.stream.Collectors.toSet());
+
+ int addedCount = (int) manifestsAfterTargeted.stream()
+ .map(ManifestFile::path)
+ .filter(path -> !beforePaths.contains(path))
+ .count();
+
+ // Invalidate table cache to ensure metadata is refreshed
+
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache(extTable);
+
+ return new Result(rewrittenCount, addedCount);
+ } catch (Exception e) {
+ LOG.warn("Failed to execute manifest rewrite for table: {}",
extTable.getName(), e);
+ throw new UserException("Failed to rewrite manifests: " +
e.getMessage(), e);
+ }
+ }
+
+ private List<ManifestFile> filterBySpecId(List<ManifestFile> manifests,
Integer specId) {
+ if (specId == null) {
+ return manifests;
+ }
+ final int targetSpecId = specId;
+ return manifests.stream()
+ .filter(manifest -> manifest.partitionSpecId() == targetSpecId)
+ .collect(java.util.stream.Collectors.toList());
+ }
+}
diff --git
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
new file mode 100644
index 00000000000..68934a31334
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !before_basic_rewrite --
+1 item1 electronics 100 2024-01-01
+2 item2 electronics 200 2024-01-02
+3 item3 books 300 2024-01-03
+4 item4 books 400 2024-01-04
+5 item5 clothing 500 2024-01-05
+6 item6 clothing 600 2024-01-06
+7 item7 electronics 700 2024-01-07
+8 item8 electronics 800 2024-01-08
+
+-- !after_basic_rewrite --
+1 item1 electronics 100 2024-01-01
+2 item2 electronics 200 2024-01-02
+3 item3 books 300 2024-01-03
+4 item4 books 400 2024-01-04
+5 item5 clothing 500 2024-01-05
+6 item6 clothing 600 2024-01-06
+7 item7 electronics 700 2024-01-07
+8 item8 electronics 800 2024-01-08
+
+-- !before_partitioned_rewrite --
+1 item1 100 2024 1
+2 item2 200 2024 1
+3 item3 300 2024 2
+4 item4 400 2024 2
+5 item5 500 2024 3
+6 item6 600 2024 3
+
+-- !after_partitioned_rewrite --
+1 item1 100 2024 1
+2 item2 200 2024 1
+3 item3 300 2024 2
+4 item4 400 2024 2
+5 item5 500 2024 3
+6 item6 600 2024 3
+
+-- !after_spec_id_rewrite --
+1 item1 100 2024 1 15
+2 item2 200 2024 1 16
+3 item3 300 2024 2 17
+4 item4 400 2024 3 18
+5 item5 500 2024 3 19
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
new file mode 100644
index 00000000000..2e3af2e307a
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_rewrite_manifests.groovy
@@ -0,0 +1,364 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_rewrite_manifests",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "test_iceberg_rewrite_manifests"
+ String db_name = "test_db"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """switch ${catalog_name}"""
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+ sql """use ${db_name}"""
+
+ //
=====================================================================================
+ // Test Case 1: Basic rewrite_manifests operation
+ // Tests the ability to rewrite multiple manifest files into fewer,
optimized files
+ //
=====================================================================================
+ logger.info("Starting basic rewrite_manifests test case")
+
+ def table_name = "test_rewrite_manifests_basic"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${table_name}"""
+
+ // Create a test table
+ sql """
+ CREATE TABLE ${db_name}.${table_name} (
+ id BIGINT,
+ name STRING,
+ category STRING,
+ value INT,
+ created_date DATE
+ ) ENGINE=iceberg
+ """
+ logger.info("Created test table: ${table_name}")
+
+ // Insert data in multiple single-row operations to create multiple
manifest files
+ // Each INSERT operation typically creates a new manifest file
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (1, 'item1',
'electronics', 100, '2024-01-01')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (2, 'item2',
'electronics', 200, '2024-01-02')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (3, 'item3', 'books',
300, '2024-01-03')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (4, 'item4', 'books',
400, '2024-01-04')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (5, 'item5',
'clothing', 500, '2024-01-05')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (6, 'item6',
'clothing', 600, '2024-01-06')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (7, 'item7',
'electronics', 700, '2024-01-07')"""
+ sql """INSERT INTO ${db_name}.${table_name} VALUES (8, 'item8',
'electronics', 800, '2024-01-08')"""
+
+ // Verify data before rewrite
+ qt_before_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+ // Check manifest count before rewrite
+ List<List<Object>> manifestsBefore = sql """
+ SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+ """
+ logger.info("Manifest count before rewrite: ${manifestsBefore}")
+
+ // Execute basic rewrite_manifests operation (no parameters - rewrite all
manifests)
+ List<List<Object>> rewriteResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Basic rewrite_manifests result: ${rewriteResult}")
+
+ // Verify the result structure
+ assertTrue(rewriteResult.size() == 1, "Expected exactly 1 result row")
+ assertTrue(rewriteResult[0].size() == 2, "Expected 2 columns in result")
+
+ // Extract rewritten and added manifest counts
+ int rewrittenCount = rewriteResult[0][0] as int
+ int addedCount = rewriteResult[0][1] as int
+
+ logger.info("Rewritten manifests: ${rewrittenCount}, Added manifests:
${addedCount}")
+ assertTrue(rewrittenCount > 0, "Should have rewritten at least 1 manifest")
+ assertTrue(addedCount >= 0, "Added count should be non-negative")
+ // Note: addedCount can be 0 if Iceberg determines manifests are already
optimal
+ // or if it reuses existing manifest files
+ if (addedCount > 0) {
+ assertTrue(addedCount <= rewrittenCount, "Added count should be <=
rewritten count (consolidation)")
+ }
+
+ // Verify data integrity after rewrite
+ qt_after_basic_rewrite """SELECT * FROM ${table_name} ORDER BY id"""
+
+ // Check manifest count after rewrite (should be fewer or equal)
+ List<List<Object>> manifestsAfter = sql """
+ SELECT COUNT(*) as manifest_count FROM ${table_name}\$manifests
+ """
+ logger.info("Manifest count after rewrite: ${manifestsAfter}")
+ assertTrue(manifestsAfter[0][0] as int <= manifestsBefore[0][0] as int,
+ "Manifest count after rewrite should be <= count before")
+
+ //
=====================================================================================
+ // Test Case 2: rewrite_manifests on partitioned table
+ // Tests manifest rewriting on a table with partition specifications
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests on partitioned table test case")
+
+ def partitioned_table = "test_rewrite_manifests_partitioned"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${partitioned_table}"""
+
+ // Create a partitioned table
+ sql """
+ CREATE TABLE ${db_name}.${partitioned_table} (
+ id BIGINT,
+ name STRING,
+ value INT,
+ year INT,
+ month INT
+ ) ENGINE=iceberg
+ PARTITION BY (year, month)()
+ """
+ logger.info("Created partitioned test table: ${partitioned_table}")
+
+ // Insert data into different partitions to create multiple manifest files
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (1, 'item1',
100, 2024, 1)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (2, 'item2',
200, 2024, 1)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (3, 'item3',
300, 2024, 2)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (4, 'item4',
400, 2024, 2)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (5, 'item5',
500, 2024, 3)"""
+ sql """INSERT INTO ${db_name}.${partitioned_table} VALUES (6, 'item6',
600, 2024, 3)"""
+
+ // Verify data before rewrite
+ qt_before_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER
BY id"""
+
+ // Check manifest count before rewrite
+ List<List<Object>> partitionedManifestsBefore = sql """
+ SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+ """
+ logger.info("Partitioned table manifest count before rewrite:
${partitionedManifestsBefore}")
+
+ // Execute rewrite_manifests on partitioned table
+ List<List<Object>> partitionedRewriteResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${partitioned_table}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Partitioned table rewrite_manifests result:
${partitionedRewriteResult}")
+
+ // Verify result structure
+ assertTrue(partitionedRewriteResult.size() == 1, "Expected exactly 1
result row for partitioned table")
+ assertTrue(partitionedRewriteResult[0].size() == 2, "Expected 2 columns in
result for partitioned table")
+
+ int partitionedRewrittenCount = partitionedRewriteResult[0][0] as int
+ int partitionedAddedCount = partitionedRewriteResult[0][1] as int
+
+ logger.info("Partitioned table - Rewritten manifests:
${partitionedRewrittenCount}, Added manifests: ${partitionedAddedCount}")
+ assertTrue(partitionedRewrittenCount > 0, "Partitioned table should have
rewritten at least 1 manifest")
+ assertTrue(partitionedAddedCount >= 0, "Partitioned table added count
should be non-negative")
+
+ // Verify data integrity after rewrite
+ qt_after_partitioned_rewrite """SELECT * FROM ${partitioned_table} ORDER
BY id"""
+
+ // Check manifest count after rewrite
+ List<List<Object>> partitionedManifestsAfter = sql """
+ SELECT COUNT(*) as manifest_count FROM ${partitioned_table}\$manifests
+ """
+ logger.info("Partitioned table manifest count after rewrite:
${partitionedManifestsAfter}")
+ assertTrue(partitionedManifestsAfter[0][0] as int <=
partitionedManifestsBefore[0][0] as int,
+ "Partitioned table manifest count after rewrite should be <= count
before")
+
+ //
=====================================================================================
+ // Test Case 3: rewrite_manifests with spec_id parameter
+ // Tests manifest rewriting for a specific partition spec
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests with spec_id test case")
+
+ def spec_id_table = "test_rewrite_manifests_spec_id"
+
+ // Clean up if table exists
+ sql """DROP TABLE IF EXISTS ${db_name}.${spec_id_table}"""
+
+ // Create a partitioned table (this will have spec_id = 0)
+ sql """
+ CREATE TABLE ${db_name}.${spec_id_table} (
+ id BIGINT,
+ name STRING,
+ value INT,
+ year INT,
+ month INT,
+ day INT
+ ) ENGINE=iceberg
+ PARTITION BY (year, month)()
+ """
+ logger.info("Created spec_id test table: ${spec_id_table}")
+
+ // Insert data to create manifests with spec_id 0
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (1, 'item1', 100,
2024, 1, 15)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (2, 'item2', 200,
2024, 1, 16)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (3, 'item3', 300,
2024, 2, 17)"""
+
+ // Check initial spec_id and manifest count
+ List<List<Object>> initialSpecs = sql """
+ SELECT partition_spec_id, COUNT(*) as manifest_count
+ FROM ${spec_id_table}\$manifests
+ GROUP BY partition_spec_id
+ ORDER BY partition_spec_id
+ """
+ logger.info("Initial spec IDs and manifest counts: ${initialSpecs}")
+
+ // Add day as a new partition field to create spec_id = 1
+ sql """ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table} ADD
PARTITION KEY day"""
+
+ // Insert more data to create manifests with spec_id 1
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (4, 'item4', 400,
2024, 3, 18)"""
+ sql """INSERT INTO ${db_name}.${spec_id_table} VALUES (5, 'item5', 500,
2024, 3, 19)"""
+
+ // Check spec_ids after adding new partition field
+ List<List<Object>> allSpecs = sql """
+ SELECT partition_spec_id, COUNT(*) as manifest_count
+ FROM ${spec_id_table}\$manifests
+ GROUP BY partition_spec_id
+ ORDER BY partition_spec_id
+ """
+ logger.info("All spec IDs and manifest counts: ${allSpecs}")
+
+ if (allSpecs.size() > 0) {
+ int targetSpec = allSpecs[0][0] as int
+ int targetCount = allSpecs[0][1] as int
+ List<List<Object>> specResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+ EXECUTE rewrite_manifests('spec_id' = '${targetSpec}')
+ """
+ int specRewritten = specResult[0][0] as int
+ assertTrue(specRewritten == targetCount,
+ "Should rewrite exactly ${targetCount} manifests for
spec_id=${targetSpec}, got ${specRewritten}")
+ qt_after_spec_id_rewrite """SELECT * FROM ${spec_id_table} ORDER BY
id"""
+ logger.info("spec_id filtering test completed successfully")
+ } else {
+ logger.warn("Could not create spec_id, skipping spec_id filtering
test")
+ }
+
+ //
=====================================================================================
+ // Test Case 4: rewrite_manifests on empty table (no current snapshot)
+ // Tests that rewrite_manifests handles tables with no current snapshot
gracefully
+ //
=====================================================================================
+ logger.info("Starting rewrite_manifests on empty table test case")
+
+ def empty_table = "test_empty_table"
+ sql """DROP TABLE IF EXISTS ${db_name}.${empty_table}"""
+ sql """
+ CREATE TABLE ${db_name}.${empty_table} (
+ id BIGINT,
+ name STRING
+ ) ENGINE=iceberg
+ """
+ logger.info("Created empty test table: ${empty_table}")
+
+ // Execute rewrite_manifests on empty table (no current snapshot)
+ List<List<Object>> emptyTableResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${empty_table}
+ EXECUTE rewrite_manifests()
+ """
+ logger.info("Empty table rewrite_manifests result: ${emptyTableResult}")
+
+ // Verify result structure
+ assertTrue(emptyTableResult.size() == 1, "Expected exactly 1 result row
for empty table")
+ assertTrue(emptyTableResult[0].size() == 2, "Expected 2 columns in result
for empty table")
+
+ // Should return 0 rewritten manifests and 0 added manifests for empty
table
+ int emptyRewrittenCount = emptyTableResult[0][0] as int
+ int emptyAddedCount = emptyTableResult[0][1] as int
+
+ assertTrue(emptyRewrittenCount == 0, "Empty table should have 0 rewritten
manifests, got: ${emptyRewrittenCount}")
+ assertTrue(emptyAddedCount == 0, "Empty table should have 0 added
manifests, got: ${emptyAddedCount}")
+
+ logger.info("Empty table test completed: rewritten=${emptyRewrittenCount},
added=${emptyAddedCount}")
+
+ //
=====================================================================================
+ // Negative Test Cases: Parameter validation and error handling
+ //
=====================================================================================
+ logger.info("Starting negative test cases for rewrite_manifests")
+
+ // Test with invalid spec_id format
+
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests('spec_id' = 'not-a-number')
+ """
+ exception "Invalid"
+ }
+
+ // Test with non-existent spec_id (on spec_id table)
+
+ // Test with non-existent spec_id (very large number unlikely to exist) on
spec_id table
+ List<List<Object>> nonExistentSpecOnSpecTable = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${spec_id_table}
+ EXECUTE rewrite_manifests('spec_id' = '99999')
+ """
+ assertTrue(nonExistentSpecOnSpecTable[0][0] as int == 0, "Non-existent
spec_id on spec_id_table should return 0 rewritten")
+ assertTrue(nonExistentSpecOnSpecTable[0][1] as int == 0, "Non-existent
spec_id on spec_id_table should return 0 added")
+
+ // Test with unknown parameter
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests('unknown-parameter' = 'value')
+ """
+ exception "Unknown argument: unknown-parameter"
+ }
+
+ // Test rewrite_manifests with partition specification (should fail)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests() PARTITIONS (part1)
+ """
+ exception "Action 'rewrite_manifests' does not support partition
specification"
+ }
+
+ // Test rewrite_manifests with WHERE condition (should fail)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+ EXECUTE rewrite_manifests() WHERE id > 0
+ """
+ exception "Action 'rewrite_manifests' does not support WHERE condition"
+ }
+
+ // Test on non-existent table
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.non_existent_table
+ EXECUTE rewrite_manifests()
+ """
+ exception "Table non_existent_table does not exist"
+ }
+
+ logger.info("All rewrite_manifests test cases completed successfully")
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]