This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9883e5c0f [Feature](Iceberg) Implement expire_snapshots procedure
for Iceberg tables (#59979)
6d9883e5c0f is described below
commit 6d9883e5c0fea605d685d87a93613339e4c0a996
Author: Socrates <[email protected]>
AuthorDate: Wed Feb 4 23:50:28 2026 +0800
[Feature](Iceberg) Implement expire_snapshots procedure for Iceberg tables
(#59979)
### What problem does this PR solve?
- Issue Number: #58199
## Summary
This PR implements the `expire_snapshots` procedure for Iceberg tables,
following the Apache Iceberg Spark procedure specification. This
procedure removes old snapshots from Iceberg tables to free up storage
space and improve metadata performance.
## Changes
### Main Implementation
- **File:**
`fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java`
- Implemented `executeAction()` method to expire snapshots using
Iceberg's `ExpireSnapshots` API
- Added `getResultSchema()` method returning 6-column output matching
Spark's schema
- Added `parseTimestamp()` helper method to support ISO datetime and
milliseconds formats
- Updated validation to allow `snapshot_ids` as a standalone parameter
- Fixed `retain_last` behavior: when specified alone, automatically sets
`expireOlderThan` to current time
### Supported Parameters
| Parameter | Description |
|-----------|-------------|
| `older_than` | Timestamp before which snapshots will be removed (ISO
datetime or milliseconds) |
| `retain_last` | Number of ancestor snapshots to preserve |
| `snapshot_ids` | Comma-separated list of specific snapshot IDs to
expire |
| `max_concurrent_deletes` | Size of thread pool for delete operations |
| `clean_expired_metadata` | When true, cleans up unused partition specs
and schemas |
### Output Schema
The procedure returns 6 columns:
- `deleted_data_files_count`
- `deleted_position_delete_files_count`
- `deleted_equality_delete_files_count`
- `deleted_manifest_files_count`
- `deleted_manifest_lists_count`
- `deleted_statistics_files_count`
### Test Updates
- **File:**
`regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy`
- Added functional tests for `expire_snapshots` with `retain_last`
parameter
- Added validation tests for `snapshot_ids` parameter
- Updated error message expectations
## Usage Example
```sql
-- Expire snapshots, keeping only the last 2
ALTER TABLE catalog.db.table EXECUTE expire_snapshots("retain_last" = "2");
-- Expire snapshots older than a specific timestamp
ALTER TABLE catalog.db.table EXECUTE expire_snapshots("older_than" =
"2024-01-01T00:00:00");
-- Expire specific snapshots by ID
ALTER TABLE catalog.db.table EXECUTE expire_snapshots("snapshot_ids" =
"123456789,987654321");
-- Combine parameters
ALTER TABLE catalog.db.table EXECUTE expire_snapshots("older_than" =
"2024-06-01T00:00:00", "retain_last" = "5");
```
---
.../action/IcebergExpireSnapshotsAction.java | 221 ++++++++++++++-
.../action/test_iceberg_expire_snapshots.out | 10 +
.../action/test_iceberg_execute_actions.groovy | 99 -------
.../action/test_iceberg_expire_snapshots.groovy | 299 +++++++++++++++++++++
.../iceberg_branch_retention_and_snapshot.groovy | 18 +-
5 files changed, 532 insertions(+), 115 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
index 77ea784eadc..0937af8ba4c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExpireSnapshotsAction.java
@@ -17,20 +17,41 @@
package org.apache.doris.datasource.iceberg.action;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
import org.apache.doris.common.AnalysisException;
import org.apache.doris.common.ArgumentParsers;
-import org.apache.doris.common.DdlException;
import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
import org.apache.doris.info.PartitionNamesInfo;
import org.apache.doris.nereids.trees.expressions.Expression;
+import com.google.common.collect.Lists;
+import org.apache.iceberg.DeleteFile;
+import org.apache.iceberg.ExpireSnapshots;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.ManifestFiles;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.SupportsBulkOperations;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
import java.time.LocalDateTime;
+import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicLong;
/**
* Iceberg expire snapshots action implementation.
@@ -39,10 +60,10 @@ import java.util.Optional;
* and improve metadata performance.
*/
public class IcebergExpireSnapshotsAction extends BaseIcebergAction {
+ private static final Logger LOG =
LogManager.getLogger(IcebergExpireSnapshotsAction.class);
public static final String OLDER_THAN = "older_than";
public static final String RETAIN_LAST = "retain_last";
public static final String MAX_CONCURRENT_DELETES =
"max_concurrent_deletes";
- public static final String STREAM_RESULTS = "stream_results";
public static final String SNAPSHOT_IDS = "snapshot_ids";
public static final String CLEAN_EXPIRED_METADATA =
"clean_expired_metadata";
@@ -62,11 +83,9 @@ public class IcebergExpireSnapshotsAction extends
BaseIcebergAction {
"Number of ancestor snapshots to preserve regardless of
older_than",
null, ArgumentParsers.positiveInt(RETAIN_LAST));
namedArguments.registerOptionalArgument(MAX_CONCURRENT_DELETES,
- "Size of the thread pool used for delete file actions",
- null, ArgumentParsers.positiveInt(MAX_CONCURRENT_DELETES));
- namedArguments.registerOptionalArgument(STREAM_RESULTS,
- "When true, deletion files will be sent to Spark driver by RDD
partition",
- null, ArgumentParsers.booleanValue(STREAM_RESULTS));
+ "Size of the thread pool used for delete file actions (0
disables, "
+ + "ignored for FileIOs that support bulk deletes)",
+ 0, ArgumentParsers.intRange(MAX_CONCURRENT_DELETES, 0,
Integer.MAX_VALUE));
namedArguments.registerOptionalArgument(SNAPSHOT_IDS,
"Array of snapshot IDs to expire",
null, ArgumentParsers.nonEmptyString(SNAPSHOT_IDS));
@@ -103,9 +122,24 @@ public class IcebergExpireSnapshotsAction extends
BaseIcebergAction {
throw new AnalysisException("retain_last must be at least 1");
}
- // At least one of older_than or retain_last must be specified for
validation
- if (olderThan == null && retainLast == null) {
- throw new AnalysisException("At least one of 'older_than' or
'retain_last' must be specified");
+ // Get snapshot_ids for validation
+ String snapshotIds = namedArguments.getString(SNAPSHOT_IDS);
+
+ // Validate snapshot_ids format if provided
+ if (snapshotIds != null) {
+ for (String idStr : snapshotIds.split(",")) {
+ try {
+ Long.parseLong(idStr.trim());
+ } catch (NumberFormatException e) {
+ throw new AnalysisException("Invalid snapshot_id format: "
+ idStr.trim());
+ }
+ }
+ }
+
+ // At least one of older_than, retain_last, or snapshot_ids must be
specified
+ if (olderThan == null && retainLast == null && snapshotIds == null) {
+ throw new AnalysisException("At least one of 'older_than',
'retain_last', or "
+ + "'snapshot_ids' must be specified");
}
// Iceberg procedures don't support partitions or where conditions
@@ -115,7 +149,172 @@ public class IcebergExpireSnapshotsAction extends
BaseIcebergAction {
@Override
protected List<String> executeAction(TableIf table) throws UserException {
- throw new DdlException("Iceberg expire_snapshots procedure is not
implemented yet");
+ Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+
+ // Parse parameters
+ String olderThan = namedArguments.getString(OLDER_THAN);
+ Integer retainLast = namedArguments.getInt(RETAIN_LAST);
+ String snapshotIdsStr = namedArguments.getString(SNAPSHOT_IDS);
+ Boolean cleanExpiredMetadata =
namedArguments.getBoolean(CLEAN_EXPIRED_METADATA);
+ Integer maxConcurrentDeletes =
namedArguments.getInt(MAX_CONCURRENT_DELETES);
+
+ // Track deleted file counts using callbacks (matching Spark's
6-column schema)
+ AtomicLong deletedDataFilesCount = new AtomicLong(0);
+ AtomicLong deletedPositionDeleteFilesCount = new AtomicLong(0);
+ AtomicLong deletedEqualityDeleteFilesCount = new AtomicLong(0);
+ AtomicLong deletedManifestFilesCount = new AtomicLong(0);
+ AtomicLong deletedManifestListsCount = new AtomicLong(0);
+ AtomicLong deletedStatisticsFilesCount = new AtomicLong(0);
+
+ ExecutorService deleteExecutor = null;
+ try {
+ Map<String, FileContent> deleteFileContentByPath =
+ buildDeleteFileContentMap(icebergTable);
+ ExpireSnapshots expireSnapshots = icebergTable.expireSnapshots();
+
+ // Configure older_than timestamp
+ // If retain_last is specified without older_than, use current
time as the cutoff
+ // This is because Iceberg's retainLast only works in conjunction
with expireOlderThan
+ if (olderThan != null) {
+ long timestampMillis = parseTimestamp(olderThan);
+ expireSnapshots.expireOlderThan(timestampMillis);
+ } else if (retainLast != null && snapshotIdsStr == null) {
+ // When only retain_last is specified, expire all snapshots
older than now
+ // but keep at least retain_last snapshots
+ expireSnapshots.expireOlderThan(System.currentTimeMillis());
+ }
+
+ // Configure retain_last
+ if (retainLast != null) {
+ expireSnapshots.retainLast(retainLast);
+ }
+
+ // Configure specific snapshot IDs to expire
+ if (snapshotIdsStr != null) {
+ for (String idStr : snapshotIdsStr.split(",")) {
+
expireSnapshots.expireSnapshotId(Long.parseLong(idStr.trim()));
+ }
+ }
+
+ // Configure clean expired metadata
+ if (cleanExpiredMetadata != null) {
+ expireSnapshots.cleanExpiredMetadata(cleanExpiredMetadata);
+ }
+
+ // Set up ExecutorService for concurrent deletes if specified
+ if (maxConcurrentDeletes > 0) {
+ if (icebergTable.io() instanceof SupportsBulkOperations) {
+ LOG.warn("max_concurrent_deletes only works with FileIOs
that do not support "
+ + "bulk deletes. This table is currently using {}
which supports bulk deletes "
+ + "so the parameter will be ignored.",
+ icebergTable.io().getClass().getName());
+ } else {
+ deleteExecutor =
Executors.newFixedThreadPool(maxConcurrentDeletes);
+ expireSnapshots.executeDeleteWith(deleteExecutor);
+ }
+ }
+
+ // Set up delete callback to count files by type
+ expireSnapshots.deleteWith(path -> {
+ FileContent deleteContent = deleteFileContentByPath.get(path);
+ if (deleteContent == FileContent.POSITION_DELETES) {
+ deletedPositionDeleteFilesCount.incrementAndGet();
+ } else if (deleteContent == FileContent.EQUALITY_DELETES) {
+ deletedEqualityDeleteFilesCount.incrementAndGet();
+ } else if (path.contains("-m-") && path.endsWith(".avro")) {
+ deletedManifestFilesCount.incrementAndGet();
+ } else if (path.contains("snap-") && path.endsWith(".avro")) {
+ deletedManifestListsCount.incrementAndGet();
+ } else if (path.endsWith(".stats") ||
path.contains("statistics")) {
+ deletedStatisticsFilesCount.incrementAndGet();
+ } else {
+ deletedDataFilesCount.incrementAndGet();
+ }
+ icebergTable.io().deleteFile(path);
+ });
+
+ // Execute and commit
+ expireSnapshots.commit();
+
+ // Invalidate cache
+ Env.getCurrentEnv().getExtMetaCacheMgr()
+ .invalidateTableCache((ExternalTable) table);
+
+ return Lists.newArrayList(
+ String.valueOf(deletedDataFilesCount.get()),
+ String.valueOf(deletedPositionDeleteFilesCount.get()),
+ String.valueOf(deletedEqualityDeleteFilesCount.get()),
+ String.valueOf(deletedManifestFilesCount.get()),
+ String.valueOf(deletedManifestListsCount.get()),
+ String.valueOf(deletedStatisticsFilesCount.get())
+ );
+ } catch (Exception e) {
+ throw new UserException("Failed to expire snapshots: " +
e.getMessage(), e);
+ } finally {
+ // Shutdown executor if created
+ if (deleteExecutor != null) {
+ deleteExecutor.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Parse timestamp string to milliseconds since epoch.
+ * Supports ISO datetime format (yyyy-MM-ddTHH:mm:ss) or milliseconds.
+ */
+ private long parseTimestamp(String timestamp) {
+ try {
+ // Try ISO datetime format
+ LocalDateTime dateTime = LocalDateTime.parse(timestamp,
+ DateTimeFormatter.ISO_LOCAL_DATE_TIME);
+ return dateTime.atZone(ZoneId.systemDefault())
+ .toInstant().toEpochMilli();
+ } catch (DateTimeParseException e) {
+ // Try as milliseconds
+ return Long.parseLong(timestamp);
+ }
+ }
+
+ private Map<String, FileContent> buildDeleteFileContentMap(Table
icebergTable) throws UserException {
+ Map<String, FileContent> deleteFileContentByPath = new HashMap<>();
+ try {
+ for (org.apache.iceberg.Snapshot snapshot :
icebergTable.snapshots()) {
+ List<ManifestFile> deleteManifests =
snapshot.deleteManifests(icebergTable.io());
+ if (deleteManifests == null || deleteManifests.isEmpty()) {
+ continue;
+ }
+ for (ManifestFile manifest : deleteManifests) {
+ try (CloseableIterable<DeleteFile> deleteFiles =
ManifestFiles.readDeleteManifest(
+ manifest, icebergTable.io(),
icebergTable.specs())) {
+ for (DeleteFile deleteFile : deleteFiles) {
+ deleteFileContentByPath.putIfAbsent(
+ deleteFile.location(),
deleteFile.content());
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ throw new UserException("Failed to build delete file content map:
" + e.getMessage(), e);
+ }
+ return deleteFileContentByPath;
+ }
+
+ @Override
+ protected List<Column> getResultSchema() {
+ return Lists.newArrayList(
+ new Column("deleted_data_files_count", Type.BIGINT, false,
+ "Number of data files deleted"),
+ new Column("deleted_position_delete_files_count", Type.BIGINT,
false,
+ "Number of position delete files deleted"),
+ new Column("deleted_equality_delete_files_count", Type.BIGINT,
false,
+ "Number of equality delete files deleted"),
+ new Column("deleted_manifest_files_count", Type.BIGINT, false,
+ "Number of manifest files deleted"),
+ new Column("deleted_manifest_lists_count", Type.BIGINT, false,
+ "Number of manifest list files deleted"),
+ new Column("deleted_statistics_files_count", Type.BIGINT, false,
+ "Number of statistics files deleted")
+ );
}
@Override
diff --git
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
new file mode 100644
index 00000000000..4ef68442a19
--- /dev/null
+++
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.out
@@ -0,0 +1,10 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !expire_snapshots_result --
+0 0 0 0 2 0
+
+-- !after_expire_snapshots --
+1 data1
+2 data2
+3 data3
+4 data4
+
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
index 00906456633..223d6f78297 100644
---
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
@@ -395,15 +395,6 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
qt_after_fast_forword_branch """SELECT * FROM
test_fast_forward@branch(feature_branch) ORDER BY id"""
- // Test expire_snapshots action
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "2024-01-01T00:00:00")
- """
- exception "Iceberg expire_snapshots procedure is not implemented yet"
- }
-
// Test validation - missing required property
test {
sql """
@@ -449,87 +440,6 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
exception "Missing required argument: timestamp"
}
- // Test expire_snapshots with invalid older_than timestamp
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "not-a-timestamp")
- """
- exception "Invalid older_than format"
- }
-
- // Test expire_snapshots with negative timestamp
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "-1000")
- """
- exception "older_than timestamp must be non-negative"
- }
-
- // Test validation - retain_last must be at least 1
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("retain_last" = "0")
- """
- exception "retain_last must be positive, got: 0"
- }
-
- // Test expire_snapshots with invalid retain_last format
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("retain_last" = "not-a-number")
- """
- exception "Invalid retain_last format: not-a-number"
- }
-
- // Test expire_snapshots with negative retain_last
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("retain_last" = "-5")
- """
- exception "retain_last must be positive, got: -5"
- }
-
- // Test expire_snapshots with neither older_than nor retain_last
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ()
- """
- exception "At least one of 'older_than' or 'retain_last' must be
specified"
- }
-
- // Test expire_snapshots with valid timestamp format (milliseconds)
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "1640995200000")
- """
- exception "Iceberg expire_snapshots procedure is not implemented yet"
- }
-
- // Test expire_snapshots with valid ISO datetime
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "2024-01-01T12:30:45")
- """
- exception "Iceberg expire_snapshots procedure is not implemented yet"
- }
-
- // Test expire_snapshots with valid retain_last and older_than
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "2024-01-01T00:00:00", "retain_last" = "5")
- """
- exception "Iceberg expire_snapshots procedure is not implemented yet"
- }
-
// Test unknown action
test {
sql """
@@ -628,15 +538,6 @@ suite("test_iceberg_optimize_actions_ddl",
"p0,external,doris,external_docker,ex
exception "Snapshot 123456789 not found in table"
}
- // Test with multiple partitions
- test {
- sql """
- ALTER TABLE ${catalog_name}.${db_name}.${table_name} EXECUTE
expire_snapshots
- ("older_than" = "2024-01-01T00:00:00") PARTITIONS (p1, p2, p3)
- """
- exception "Action 'expire_snapshots' does not support partition
specification"
- }
-
//
=====================================================================================
// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
// Simplified workflow:
diff --git
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
new file mode 100644
index 00000000000..2470dc16b39
--- /dev/null
+++
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_expire_snapshots.groovy
@@ -0,0 +1,299 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import com.amazonaws.auth.AWSStaticCredentialsProvider
+import com.amazonaws.auth.BasicAWSCredentials
+import com.amazonaws.client.builder.AwsClientBuilder
+import com.amazonaws.services.s3.AmazonS3
+import com.amazonaws.services.s3.AmazonS3ClientBuilder
+import groovy.json.JsonSlurper
+
+suite("test_iceberg_expire_snapshots",
"p0,external,doris,external_docker,external_docker_doris") {
+ String enabled = context.config.otherConfigs.get("enableIcebergTest")
+ if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+ logger.info("disable iceberg test.")
+ return
+ }
+
+ String catalog_name = "test_iceberg_expire_snapshots"
+ String db_name = "test_db"
+ String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+ String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+ String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+ sql """drop catalog if exists ${catalog_name}"""
+ sql """
+ CREATE CATALOG ${catalog_name} PROPERTIES (
+ 'type'='iceberg',
+ 'iceberg.catalog.type'='rest',
+ 'uri' = 'http://${externalEnvIp}:${rest_port}',
+ "s3.access_key" = "admin",
+ "s3.secret_key" = "password",
+ "s3.endpoint" = "http://${externalEnvIp}:${minio_port}",
+ "s3.region" = "us-east-1"
+ );"""
+
+ sql """switch ${catalog_name}"""
+ sql """CREATE DATABASE IF NOT EXISTS ${db_name} """
+ sql """use ${db_name}"""
+
+ def buildIcebergS3Client = { ->
+ def credentials = new BasicAWSCredentials("admin", "password")
+ def endpoint = "http://${externalEnvIp}:${minio_port}"
+ return AmazonS3ClientBuilder.standard()
+ .withEndpointConfiguration(new
AwsClientBuilder.EndpointConfiguration(endpoint, "us-east-1"))
+ .withCredentials(new AWSStaticCredentialsProvider(credentials))
+ .withPathStyleAccessEnabled(true)
+ .build()
+ }
+
+ def parseS3Path = { String path ->
+ int schemeIdx = path.indexOf("://")
+ assertTrue(schemeIdx > 0, "Unexpected file path: ${path}")
+ String withoutScheme = path.substring(schemeIdx + 3)
+ withoutScheme = withoutScheme.split("\\?")[0].split("#")[0]
+ int slashIdx = withoutScheme.indexOf("/")
+ String bucket = slashIdx > 0 ? withoutScheme.substring(0, slashIdx) :
withoutScheme
+ String key = slashIdx > 0 ? withoutScheme.substring(slashIdx + 1) : ""
+ return [bucket, key]
+ }
+
+ def readMetadataJson = { AmazonS3 client, String metadataPath ->
+ def (bucket, key) = parseS3Path(metadataPath)
+ def obj = client.getObject(bucket, key)
+ try {
+ String text = obj.objectContent.getText("UTF-8")
+ return new JsonSlurper().parseText(text)
+ } finally {
+ obj.objectContent.close()
+ }
+ }
+
+ //
=====================================================================================
+ // Test Case 1: expire_snapshots action with retain_last parameter
+ // Tests the ability to expire old snapshots from Iceberg tables
+ //
=====================================================================================
+ logger.info("Starting expire_snapshots test case")
+
+ // Create test table for expire_snapshots
+ sql """DROP TABLE IF EXISTS ${db_name}.test_expire_snapshots"""
+ sql """
+ CREATE TABLE ${db_name}.test_expire_snapshots (
+ id BIGINT,
+ data STRING
+ ) ENGINE=iceberg
+ """
+
+ // Insert data to create multiple snapshots
+ sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (1, 'data1')"""
+ sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (2, 'data2')"""
+ sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (3, 'data3')"""
+ sql """INSERT INTO ${db_name}.test_expire_snapshots VALUES (4, 'data4')"""
+
+ // Verify 4 snapshots exist
+ List<List<Object>> snapshotsBefore = sql """
+ SELECT snapshot_id FROM test_expire_snapshots\$snapshots ORDER BY
committed_at
+ """
+ assertTrue(snapshotsBefore.size() == 4, "Expected 4 snapshots before
expiration")
+ logger.info("Snapshots before expire: ${snapshotsBefore}")
+
+ // Test 1: expire_snapshots with retain_last=2
+ qt_expire_snapshots_result """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
+ EXECUTE expire_snapshots("retain_last" = "2")
+ """
+
+ // Verify only 2 snapshots remain
+ List<List<Object>> snapshotsAfter = sql """
+ SELECT snapshot_id FROM test_expire_snapshots\$snapshots ORDER BY
committed_at
+ """
+ assertTrue(snapshotsAfter.size() == 2, "Expected 2 snapshots after
expiration with retain_last=2")
+ logger.info("Snapshots after expire: ${snapshotsAfter}")
+
+ // Test 2: Verify data is still accessible
+ qt_after_expire_snapshots """SELECT * FROM test_expire_snapshots ORDER BY
id"""
+
+ logger.info("expire_snapshots test case completed successfully")
+
+ //
=====================================================================================
+ // Test Case 2: expire_snapshots with older_than parameter
+ //
=====================================================================================
+ // Test expire_snapshots with older_than (should work, but not expire
snapshots that are recent)
+ List<List<Object>> expireOlderThanResult = sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
+ EXECUTE expire_snapshots("older_than" = "2024-01-01T00:00:00")
+ """
+ logger.info("Expire older_than result: ${expireOlderThanResult}")
+
+ //
=====================================================================================
+ // Test Case 3: expire_snapshots should remove expired snapshots and clear
old files from metadata
+ //
=====================================================================================
+ String delete_files_table = "test_expire_snapshots_delete_files"
+ sql """DROP TABLE IF EXISTS ${db_name}.${delete_files_table}"""
+ sql """
+ CREATE TABLE ${db_name}.${delete_files_table} (
+ id BIGINT,
+ data STRING
+ ) ENGINE=iceberg
+ """
+ sql """INSERT INTO ${db_name}.${delete_files_table} VALUES (1, 'data1')"""
+
+ List<List<Object>> allDataFilesBefore = sql """SELECT file_path FROM
${delete_files_table}\$all_data_files"""
+ assertTrue(allDataFilesBefore.size() > 0, "Expected data files after
initial insert")
+ def oldFiles = allDataFilesBefore.collect { String.valueOf(it[0]) } as Set
+
+ // Overwrite table via Doris
+ sql """INSERT OVERWRITE TABLE ${db_name}.${delete_files_table} VALUES (3,
'data3')"""
+
+ List<List<Object>> snapshotsBeforeExpire = sql """
+ SELECT snapshot_id FROM ${delete_files_table}\$snapshots ORDER BY
committed_at
+ """
+ assertTrue(snapshotsBeforeExpire.size() >= 2, "Expected multiple snapshots
before expiration")
+ List<List<Object>> allDataFilesAfterOverwrite = sql """SELECT file_path
FROM ${delete_files_table}\$all_data_files"""
+ def allFilesAfterOverwrite = allDataFilesAfterOverwrite.collect {
String.valueOf(it[0]) } as Set
+ assertTrue(allFilesAfterOverwrite.containsAll(oldFiles),
+ "Expected old files still visible in all_data_files before
expiration")
+ long expireMillis = System.currentTimeMillis()
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${delete_files_table}
+ EXECUTE expire_snapshots("older_than" = "${expireMillis}",
"retain_last" = "1")
+ """
+ List<List<Object>> snapshotsAfterExpire = sql """
+ SELECT snapshot_id FROM ${delete_files_table}\$snapshots ORDER BY
committed_at
+ """
+ assertTrue(snapshotsAfterExpire.size() == 1,
+ "Expected 1 snapshot after expiration, got:
${snapshotsAfterExpire.size()}")
+ List<List<Object>> allDataFilesAfterExpire = sql """SELECT file_path FROM
${delete_files_table}\$all_data_files"""
+ def allFilesAfterExpire = allDataFilesAfterExpire.collect {
String.valueOf(it[0]) } as Set
+ assertTrue(oldFiles.intersect(allFilesAfterExpire).isEmpty(),
+ "Expected old files removed from all_data_files after expiration")
+
+ //
=====================================================================================
+ // Test Case 4: expire_snapshots should clean expired metadata when enabled
+ //
=====================================================================================
+ String clean_metadata_table = "test_expire_snapshots_clean_metadata"
+ sql """DROP TABLE IF EXISTS ${db_name}.${clean_metadata_table}"""
+ sql """
+ CREATE TABLE ${db_name}.${clean_metadata_table} (
+ id BIGINT,
+ data STRING
+ ) ENGINE=iceberg
+ """
+ sql """INSERT INTO ${db_name}.${clean_metadata_table} VALUES (1,
'data1')"""
+ sql """ALTER TABLE ${db_name}.${clean_metadata_table} ADD COLUMN extra_col
INT"""
+ sql """INSERT INTO ${db_name}.${clean_metadata_table} VALUES (2, 'data2',
10)"""
+
+ AmazonS3 icebergS3Client = buildIcebergS3Client()
+ String metadataBeforePath = String.valueOf((sql """
+ SELECT file FROM ${clean_metadata_table}\$metadata_log_entries
+ ORDER BY timestamp DESC LIMIT 1
+ """)[0][0])
+ def metadataBefore = readMetadataJson(icebergS3Client, metadataBeforePath)
+ long schemasBefore = ((List) metadataBefore.schemas).size()
+ assertTrue(schemasBefore >= 2, "Expected multiple schemas before cleanup")
+
+ long expireMillis2 = System.currentTimeMillis()
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.${clean_metadata_table}
+ EXECUTE expire_snapshots("older_than" = "${expireMillis2}",
"retain_last" = "1",
+ "clean_expired_metadata" = "true")
+ """
+
+ String metadataAfterPath = String.valueOf((sql """
+ SELECT file FROM ${clean_metadata_table}\$metadata_log_entries
+ ORDER BY timestamp DESC LIMIT 1
+ """)[0][0])
+ def metadataAfter = readMetadataJson(icebergS3Client, metadataAfterPath)
+ long schemasAfter = ((List) metadataAfter.schemas).size()
+ assertTrue(schemasAfter < schemasBefore,
+ "Expected schemas cleaned up, before=${schemasBefore},
after=${schemasAfter}")
+
+ //
=====================================================================================
+ // Negative Test Cases for expire_snapshots
+ //
=====================================================================================
+
+ // Test validation - missing required property (neither older_than,
retain_last, nor snapshot_ids)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ()
+ """
+ exception "At least one of 'older_than', 'retain_last', or
'snapshot_ids' must be specified"
+ }
+
+ // Test expire_snapshots with invalid older_than timestamp
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("older_than" = "not-a-timestamp")
+ """
+ exception "Invalid older_than format"
+ }
+
+ // Test expire_snapshots with negative timestamp
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("older_than" = "-1000")
+ """
+ exception "older_than timestamp must be non-negative"
+ }
+
+ // Test validation - retain_last must be at least 1
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("retain_last" = "0")
+ """
+ exception "retain_last must be positive, got: 0"
+ }
+
+ // Test expire_snapshots with invalid retain_last format
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("retain_last" = "not-a-number")
+ """
+ exception "Invalid retain_last format: not-a-number"
+ }
+
+ // Test expire_snapshots with negative retain_last
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("retain_last" = "-5")
+ """
+ exception "retain_last must be positive, got: -5"
+ }
+
+ // Test expire_snapshots with invalid snapshot_ids format
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("snapshot_ids" = "not-a-number")
+ """
+ exception "Invalid snapshot_id format: not-a-number"
+ }
+
+ // Test expire_snapshots with partition specification (should fail)
+ test {
+ sql """
+ ALTER TABLE ${catalog_name}.${db_name}.test_expire_snapshots
EXECUTE expire_snapshots
+ ("older_than" = "2024-01-01T00:00:00") PARTITIONS (p1, p2, p3)
+ """
+ exception "Action 'expire_snapshots' does not support partition
specification"
+ }
+}
diff --git
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
index 489715d1ff1..ddf12b8f940 100644
---
a/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
+++
b/regression-test/suites/external_table_p0/iceberg/branch_tag/iceberg_branch_retention_and_snapshot.groovy
@@ -82,9 +82,12 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external,doris,external_docke
// Create tags to protect additional snapshots
sql """ alter table ${table_name_expire} create tag t_expire_protect AS OF
VERSION ${s_expire_1} """
- // Call expire_snapshots via Spark - should not delete snapshots
referenced by branch/tag
+ // Call expire_snapshots via Doris - should not delete snapshots
referenced by branch/tag
// Using a timestamp that would expire old snapshots but not those
referenced by branch/tag
- spark_iceberg """CALL demo.system.expire_snapshots(table =>
'test_db_retention.${table_name_expire}', older_than => TIMESTAMP '2020-01-01
00:00:00')"""
+ sql """
+ ALTER TABLE ${catalog_name}.test_db_retention.${table_name_expire}
+ EXECUTE expire_snapshots("older_than" = "2020-01-01T00:00:00")
+ """
// Verify snapshots are still accessible after expire_snapshots
qt_expire_branch_still_accessible """ select count(*) from
${table_name_expire}@branch(b_expire_test) """ // Should still have data
@@ -124,7 +127,10 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external,doris,external_docke
def snapshot_count_retain_before = sql """ select count(*) from
iceberg_meta("table" =
"${catalog_name}.test_db_retention.${table_name_retain_count}", "query_type" =
"snapshots") """
// Call expire_snapshots - older snapshots beyond retention count may be
expired, but branch snapshot should be protected
- spark_iceberg """CALL demo.system.expire_snapshots(table =>
'test_db_retention.${table_name_retain_count}', older_than => TIMESTAMP
'2020-01-01 00:00:00')"""
+ sql """
+ ALTER TABLE
${catalog_name}.test_db_retention.${table_name_retain_count}
+ EXECUTE expire_snapshots("older_than" = "2020-01-01T00:00:00")
+ """
// Verify branch is still accessible and has data
qt_retain_count_branch_accessible """ select count(*) from
${table_name_retain_count}@branch(b_retain_count) """ // Should have data
@@ -160,7 +166,10 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external,doris,external_docke
logger.info("Snapshot count before expire:
${snapshot_count_unref_before[0][0]}")
// Call expire_snapshots - old unreferenced snapshots should be expired
- spark_iceberg """CALL demo.system.expire_snapshots(table =>
'test_db_retention.${table_name_unref}', retain_last => 1)"""
+ sql """
+ ALTER TABLE ${catalog_name}.test_db_retention.${table_name_unref}
+ EXECUTE expire_snapshots("retain_last" = "1")
+ """
// Count snapshots after expire
def snapshot_count_unref_after = sql """ select count(*) from
iceberg_meta("table" = "${catalog_name}.test_db_retention.${table_name_unref}",
"query_type" = "snapshots") """
@@ -180,4 +189,3 @@ suite("iceberg_branch_retention_and_snapshot",
"p0,external,doris,external_docke
assertEquals(tag_ref_unref_after[0][0], 1)
}
-
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]