This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 6c3c551944c [Feature](Iceberg) Implement publish_changes procedure for 
Iceberg tables  (#58755)
6c3c551944c is described below

commit 6c3c551944c8672d59f5ba765879735007cfb8ba
Author: xylaaaaa <[email protected]>
AuthorDate: Tue Dec 9 22:31:02 2025 +0800

    [Feature](Iceberg) Implement publish_changes procedure for Iceberg tables  
(#58755)
    
    ### What problem does this PR solve?
    
    - **Issue Number**: part of #58199
    - **Related PR**: N/A
    
    Problem Summary:
    This PR implements the `publish_changes` action for Iceberg tables. This
    action serves as the "Publish" step in the Write-Audit-Publish (WAP)
    pattern. The procedure locates a snapshot tagged with a specific
    `wap.id` property and cherry-picks it into the current table state. This
    allows users to atomically make "staged" data visible after validation.
    
    Syntax:
    ```sql
    EXECUTE TABLE catalog.db.table_name publish_changes("wap_id" = "batch_123");
    ````
    
    Output:
    Returns `previous_snapshot_id` (STRING) and `current_snapshot_id`
    (STRING) indicating the state transition.
    
    Use cases:
    
    1.  Implement Write-Audit-Publish (WAP) workflows.
    2.  Atomically publish validated data to the main branch.
    3.  Manage staged snapshots based on custom WAP IDs.
    
    Co-authored-by: Chenjunwei <[email protected]>
---
 .../create_preinstalled_scripts/iceberg/run23.sql  |  40 +++++++
 .../action/IcebergExecuteActionFactory.java        |   7 +-
 .../action/IcebergPublishChangesAction.java        | 128 +++++++++++++++++++++
 .../action/test_iceberg_execute_actions.out        |   6 +
 .../action/test_iceberg_execute_actions.groovy     |  94 +++++++++++++++
 5 files changed, 274 insertions(+), 1 deletion(-)

diff --git 
a/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
new file mode 100644
index 00000000000..313766e7b59
--- /dev/null
+++ 
b/docker/thirdparties/docker-compose/iceberg/scripts/create_preinstalled_scripts/iceberg/run23.sql
@@ -0,0 +1,40 @@
+
+CREATE DATABASE IF NOT EXISTS demo.wap_test;
+
+
+USE demo.wap_test;
+
+
+DROP TABLE IF EXISTS orders_wap;
+
+-- WAP-enabled orders table
+CREATE TABLE orders_wap (
+    order_id     INT,
+    customer_id  INT,
+    amount       DECIMAL(10, 2),
+    order_date   STRING
+)
+USING iceberg;
+ALTER TABLE wap_test.orders_wap SET TBLPROPERTIES ('write.wap.enabled'='true');
+
+SET spark.wap.id = test_wap_001;
+
+
+
+INSERT INTO orders_wap VALUES 
+    (1, 103, 150.00, '2025-12-03'),
+    (2, 104, 320.25, '2025-12-04');
+
+
+DROP TABLE IF EXISTS orders_non_wap;
+-- Non WAP-enabled orders table
+CREATE TABLE orders_non_wap (
+    order_id INT,
+    customer_id INT,
+    amount DECIMAL(10, 2),
+    order_date STRING
+)
+USING iceberg;
+
+INSERT INTO orders_non_wap VALUES
+(1, 201, 10.00, '2025-12-01');
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
index 94be847700c..7c208cb7db6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergExecuteActionFactory.java
@@ -39,6 +39,7 @@ public class IcebergExecuteActionFactory {
     public static final String FAST_FORWARD = "fast_forward";
     public static final String EXPIRE_SNAPSHOTS = "expire_snapshots";
     public static final String REWRITE_DATA_FILES = "rewrite_data_files";
+    public static final String PUBLISH_CHANGES = "publish_changes";
 
     /**
      * Create an Iceberg-specific ExecuteAction instance.
@@ -80,6 +81,9 @@ public class IcebergExecuteActionFactory {
             case REWRITE_DATA_FILES:
                 return new IcebergRewriteDataFilesAction(properties, 
partitionNamesInfo,
                         whereCondition);
+            case PUBLISH_CHANGES:
+                return new IcebergPublishChangesAction(properties, 
partitionNamesInfo,
+                        whereCondition);
             default:
                 throw new DdlException("Unsupported Iceberg procedure: " + 
actionType
                         + ". Supported procedures: " + String.join(", ", 
getSupportedActions()));
@@ -99,7 +103,8 @@ public class IcebergExecuteActionFactory {
                 CHERRYPICK_SNAPSHOT,
                 FAST_FORWARD,
                 EXPIRE_SNAPSHOTS,
-                REWRITE_DATA_FILES
+                REWRITE_DATA_FILES,
+                PUBLISH_CHANGES
         };
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
new file mode 100644
index 00000000000..e1bf8cbdad4
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/action/IcebergPublishChangesAction.java
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.datasource.iceberg.action;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.catalog.Type;
+import org.apache.doris.common.ArgumentParsers;
+import org.apache.doris.common.UserException;
+import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
+import org.apache.doris.info.PartitionNamesInfo;
+import org.apache.doris.nereids.trees.expressions.Expression;
+
+import com.google.common.collect.Lists;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Implements Iceberg's publish_changes action (Core of the WAP pattern).
+ * This action finds a snapshot tagged with a specific 'wap.id' and 
cherry-picks it
+ * into the current table state.
+ * Corresponds to Spark syntax: CALL catalog.system.publish_changes('table', 
'wap_id_123')
+ */
+public class IcebergPublishChangesAction extends BaseIcebergAction {
+    public static final String WAP_ID = "wap_id";
+    private static final String WAP_ID_PROP = "wap.id";
+
+    public IcebergPublishChangesAction(Map<String, String> properties,
+            Optional<PartitionNamesInfo> partitionNamesInfo, 
Optional<Expression> whereCondition) {
+        super("publish_changes", properties, partitionNamesInfo, 
whereCondition);
+    }
+
+    @Override
+    protected void registerIcebergArguments() {
+        namedArguments.registerRequiredArgument(WAP_ID,
+                "The WAP ID matching the snapshot to publish",
+                ArgumentParsers.nonEmptyString(WAP_ID));
+    }
+
+    @Override
+    protected void validateIcebergAction() throws UserException {
+        validateNoPartitions();
+        validateNoWhereCondition();
+    }
+
+    @Override
+    protected List<String> executeAction(TableIf table) throws UserException {
+        Table icebergTable = ((IcebergExternalTable) table).getIcebergTable();
+        String targetWapId = namedArguments.getString(WAP_ID);
+
+        // Find the target WAP snapshot
+        Snapshot wapSnapshot = null;
+        for (Snapshot snapshot : icebergTable.snapshots()) {
+            if (targetWapId.equals(snapshot.summary().get(WAP_ID_PROP))) {
+                wapSnapshot = snapshot;
+                break;
+            }
+        }
+
+        if (wapSnapshot == null) {
+            throw new UserException("Cannot find snapshot with " + WAP_ID_PROP 
+ " = " + targetWapId);
+        }
+
+        long wapSnapshotId = wapSnapshot.snapshotId();
+
+        try {
+            // Get previous snapshot ID for result
+            Snapshot previousSnapshot = icebergTable.currentSnapshot();
+            Long previousSnapshotId = previousSnapshot != null ? 
previousSnapshot.snapshotId() : null;
+
+            // Execute Cherry-pick
+            icebergTable.manageSnapshots().cherrypick(wapSnapshotId).commit();
+
+            // Get current snapshot ID after commit
+            Snapshot currentSnapshot = icebergTable.currentSnapshot();
+            Long currentSnapshotId = currentSnapshot != null ? 
currentSnapshot.snapshotId() : null;
+
+            // Invalidate iceberg catalog table cache
+            
Env.getCurrentEnv().getExtMetaCacheMgr().invalidateTableCache((ExternalTable) 
table);
+
+            String previousSnapshotIdString = previousSnapshotId != null ? 
String.valueOf(previousSnapshotId) : "null";
+            String currentSnapshotIdString = currentSnapshotId != null ? 
String.valueOf(currentSnapshotId) : "null";
+
+            return Lists.newArrayList(
+                    previousSnapshotIdString,
+                    currentSnapshotIdString
+            );
+
+        } catch (Exception e) {
+            throw new UserException("Failed to publish changes for wap.id " + 
targetWapId + ": " + e.getMessage(), e);
+        }
+    }
+
+    @Override
+    protected List<Column> getResultSchema() {
+        return Lists.newArrayList(
+                new Column("previous_snapshot_id", Type.STRING, false,
+                        "ID of the snapshot before the publish operation"),
+                new Column("current_snapshot_id", Type.STRING, false,
+                        "ID of the new snapshot created as a result of the 
publish operation"));
+    }
+
+    @Override
+    public String getDescription() {
+        return "Publish a WAP snapshot by cherry-picking it to the current 
table state";
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
index a815666d92a..bac2b4e6bf7 100644
--- 
a/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
+++ 
b/regression-test/data/external_table_p0/iceberg/action/test_iceberg_execute_actions.out
@@ -66,3 +66,9 @@
 2      record2 200
 3      record3 300
 
+-- !wap_before_publish --
+
+-- !wap_after_publish --
+1      103     150.00  2025-12-03
+2      104     320.25  2025-12-04
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
index f84e05de167..00906456633 100644
--- 
a/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
+++ 
b/regression-test/suites/external_table_p0/iceberg/action/test_iceberg_execute_actions.groovy
@@ -636,4 +636,98 @@ suite("test_iceberg_optimize_actions_ddl", 
"p0,external,doris,external_docker,ex
         """
         exception "Action 'expire_snapshots' does not support partition 
specification"
     }
+
+    // 
=====================================================================================
+// Test Case 6: publish_changes action with WAP (Write-Audit-Publish) pattern
+// Simplified workflow:
+//
+//   - Main branch is initially empty (0 rows)
+//   - A WAP snapshot exists with wap.id = "test_wap_001" and 2 rows
+//   - publish_changes should cherry-pick the WAP snapshot into the main branch
+// 
=====================================================================================
+
+logger.info("Starting simplified WAP (Write-Audit-Publish) workflow 
verification test")
+
+// WAP test database and table
+String wap_db = "wap_test"
+String wap_table = "orders_wap"
+
+// Step 1: Verify no data is visible before publish_changes
+logger.info("Step 1: Verifying table is empty before publish_changes")
+qt_wap_before_publish """
+    SELECT order_id, customer_id, amount, order_date
+    FROM ${catalog_name}.${wap_db}.${wap_table}
+    ORDER BY order_id
+"""
+
+// Step 2: Publish the WAP changes with wap_id = "test_wap_001"
+logger.info("Step 2: Publishing WAP changes with wap_id=test_wap_001")
+sql """
+    ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
+    EXECUTE publish_changes("wap_id" = "test_wap_001")
+"""
+logger.info("Publish changes executed successfully")
+
+// Step 3: Verify WAP data is visible after publish_changes
+logger.info("Step 3: Verifying WAP data is visible after publish_changes")
+qt_wap_after_publish """
+    SELECT order_id, customer_id, amount, order_date
+    FROM ${catalog_name}.${wap_db}.${wap_table}
+    ORDER BY order_id
+"""
+
+logger.info("Simplified WAP (Write-Audit-Publish) workflow verification 
completed successfully")
+
+// Negative tests for publish_changes
+
+// publish_changes on table without write.wap.enabled = true (should fail)
+test {
+    String nonWapDb = "wap_test"
+    String nonWapTable = "orders_non_wap"
+
+    sql """
+    ALTER TABLE ${catalog_name}.${nonWapDb}.${nonWapTable}
+    EXECUTE publish_changes("wap_id" = "test_wap_001")
+    """
+    exception "Cannot find snapshot with wap.id = test_wap_001"
+}
+
+
+// publish_changes with missing wap_id (should fail)
+test {
+    sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+        EXECUTE publish_changes ()
+    """
+    exception "Missing required argument: wap_id"
+}
+
+// publish_changes with invalid wap_id (should fail)
+test {
+    sql """
+    ALTER TABLE ${catalog_name}.${wap_db}.${wap_table}
+    EXECUTE publish_changes("wap_id" = "non_existing_wap_id")
+    """
+    exception "Cannot find snapshot with wap.id = non_existing_wap_id"
+}
+
+// publish_changes with partition specification (should fail)
+test {
+    sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+        EXECUTE publish_changes ("wap_id" = "test_wap_001") PARTITIONS (part1)
+    """
+    exception "Action 'publish_changes' does not support partition 
specification"
+}
+
+// publish_changes with WHERE condition (should fail)
+test {
+    sql """
+        ALTER TABLE ${catalog_name}.${db_name}.${table_name}
+        EXECUTE publish_changes ("wap_id" = "test_wap_001") WHERE id > 0
+    """
+    exception "Action 'publish_changes' does not support WHERE condition"
+}
+
+  
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to