This is an automated email from the ASF dual-hosted git repository.

aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b37f58137 Spark 3.2: Add a procedure to create changelog view (#7036)
3b37f58137 is described below

commit 3b37f581371c1fd70d2a56f5bd900ec7a8436e7e
Author: Yufei Gu <[email protected]>
AuthorDate: Tue Mar 7 20:51:54 2023 -0800

    Spark 3.2: Add a procedure to create changelog view (#7036)
    
    This change backports PR #6012 to Spark 3.2.
---
 .../TestCreateChangelogViewProcedure.java          | 450 +++++++++++++++++++++
 .../iceberg/spark/procedures/BaseProcedure.java    |  10 +
 .../procedures/CreateChangelogViewProcedure.java   | 266 ++++++++++++
 .../iceberg/spark/procedures/SparkProcedures.java  |   1 +
 4 files changed, 727 insertions(+)

diff --git 
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
new file mode 100644
index 0000000000..dc12b0145d
--- /dev/null
+++ 
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase {
+  private static final String DELETE = ChangelogOperation.DELETE.name();
+  private static final String INSERT = ChangelogOperation.INSERT.name();
+  private static final String UPDATE_BEFORE = 
ChangelogOperation.UPDATE_BEFORE.name();
+  private static final String UPDATE_AFTER = 
ChangelogOperation.UPDATE_AFTER.name();
+
+  public TestCreateChangelogViewProcedure(
+      String catalogName, String implementation, Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTable() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  public void createTableWith2Columns() {
+    sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 
1);
+    sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+  }
+
+  private void createTableWith3Columns() {
+    sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 
1);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+  }
+
+  private void createTableWithIdentifierField() {
+    sql("CREATE TABLE %s (id INT NOT NULL, data STRING) USING iceberg", 
tableName);
+    sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName, 
1);
+    sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+  }
+
+  @Test
+  public void testCustomizedViewName() {
+    createTableWith2Columns();
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+    Table table = validationCatalog.loadTable(tableIdent);
+
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+    table.refresh();
+
+    Snapshot snap2 = table.currentSnapshot();
+
+    sql(
+        "CALL %s.system.create_changelog_view("
+            + "table => '%s',"
+            + "options => map('%s','%s','%s','%s'),"
+            + "changelog_view => '%s')",
+        catalogName,
+        tableName,
+        SparkReadOptions.START_SNAPSHOT_ID,
+        snap1.snapshotId(),
+        SparkReadOptions.END_SNAPSHOT_ID,
+        snap2.snapshotId(),
+        "cdc_view");
+
+    long rowCount = sql("select * from %s", "cdc_view").stream().count();
+    Assert.assertEquals(2, rowCount);
+  }
+
+  @Test
+  public void testNoSnapshotIdInput() {
+    createTableWith2Columns();
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(" + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", viewName));
+  }
+
+  @Test
+  public void testTimestampsBasedQuery() {
+    createTableWith2Columns();
+    long beginning = System.currentTimeMillis();
+
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+    long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis());
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s','%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            beginning,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", 
returns.get(0)[0]));
+
+    // query the timestamps starting from the second insert
+    returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', "
+                + "options => map('%s', '%s', '%s', '%s'))",
+            catalogName,
+            tableName,
+            SparkReadOptions.START_TIMESTAMP,
+            afterFirstInsert,
+            SparkReadOptions.END_TIMESTAMP,
+            afterInsertOverwrite);
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(-2, "b", INSERT, 1, snap2.snapshotId()),
+            row(2, "b", DELETE, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id", 
returns.get(0)[0]));
+  }
+
+  @Test
+  public void testWithCarryovers() {
+    createTableWith2Columns();
+    sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap0 = table.currentSnapshot();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName, "cdc_view");
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap0.snapshotId()),
+            row(2, "b", INSERT, 1, snap1.snapshotId()),
+            row(-2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", DELETE, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId()),
+            row(2, "b", INSERT, 2, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, _change_type", 
viewName));
+  }
+
+  @Test
+  public void testUpdate() {
+    createTableWith2Columns();
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', 
identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithIdentifierField() {
+    createTableWithIdentifierField();
+
+    sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', 
compute_updates => true)",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithFilter() {
+    createTableWith2Columns();
+    sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+    sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+    sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view(table => '%s', 
identifier_columns => array('id'))",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", INSERT, 0, snap1.snapshotId()),
+            row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())),
+        // the predicate on partition columns will filter out the insert of 
(3, 'c') at the planning
+        // phase
+        sql("select * from %s where id != 3 order by _change_ordinal, id, 
data", viewName));
+  }
+
+  @Test
+  public void testUpdateWithMultipleIdentifierColumns() {
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'),"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOvers() {
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "identifier_columns => array('id','age'), "
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 
1) are removed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+            row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testRemoveCarryOversWithoutUpdatedRows() {
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql("CALL %s.system.create_changelog_view(table => '%s')", 
catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT', 
1) are removed, even
+    // though update-row is not computed
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data", viewName));
+  }
+
+  @Test
+  public void testNotRemoveCarryOvers() {
+    createTableWith3Columns();
+
+    sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)", 
tableName);
+    Table table = validationCatalog.loadTable(tableIdent);
+    Snapshot snap1 = table.currentSnapshot();
+
+    // carry-over row (2, 'e', 12)
+    sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)", 
tableName);
+    table.refresh();
+    Snapshot snap2 = table.currentSnapshot();
+
+    List<Object[]> returns =
+        sql(
+            "CALL %s.system.create_changelog_view("
+                + "remove_carryovers => false,"
+                + "table => '%s')",
+            catalogName, tableName);
+
+    String viewName = (String) returns.get(0)[0];
+
+    assertEquals(
+        "Rows should match",
+        ImmutableList.of(
+            row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+            row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+            row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+            row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+            // the following two rows are carry-over rows
+            row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
+            row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+            row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+        sql("select * from %s order by _change_ordinal, id, data, 
_change_type", viewName));
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 86364dc262..1babe4efae 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -18,6 +18,7 @@
  */
 package org.apache.iceberg.spark.procedures;
 
+import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -31,6 +32,8 @@ import 
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
 import org.apache.iceberg.spark.actions.SparkActions;
 import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
 import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -49,6 +52,7 @@ import scala.Option;
 abstract class BaseProcedure implements Procedure {
   protected static final DataType STRING_MAP =
       DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+  protected static final DataType STRING_ARRAY = 
DataTypes.createArrayType(DataTypes.StringType);
 
   private final SparkSession spark;
   private final TableCatalog tableCatalog;
@@ -144,6 +148,12 @@ abstract class BaseProcedure implements Procedure {
     }
   }
 
+  protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent, 
Map<String, String> options) {
+    String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(), 
tableIdent);
+    // no need to validate the read options here since the reader will 
validate them
+    return spark().read().options(options).table(tableName);
+  }
+
   protected void refreshSparkCache(Identifier ident, Table table) {
     CacheManager cacheManager = spark.sharedState().cacheManager();
     DataSourceV2Relation relation =
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
new file mode 100644
index 0000000000..ab844e08e9
--- /dev/null
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to 
keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If 
you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same 
row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which 
contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would 
require erasing this
+ * file and preserving row1 in a new file. The changelog table would report 
this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an 
actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the 
result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an 
insert row. Identifier
+ * columns are used for determining whether an insert and a delete record 
refer to the same row. If
+ * the two records share the same values for the identity columns they are 
considered to be before
+ * and after states of the same row. You can either set identifier fields in 
the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post 
update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the 
same id:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='DELETE')
+ *   <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ *   <li>(id=1, data='a', op='UPDATE_BEFORE')
+ *   <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+  private static final ProcedureParameter[] PARAMETERS =
+      new ProcedureParameter[] {
+        ProcedureParameter.required("table", DataTypes.StringType),
+        ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+        ProcedureParameter.optional("options", STRING_MAP),
+        ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+        ProcedureParameter.optional("remove_carryovers", 
DataTypes.BooleanType),
+        ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+      };
+
+  private static final int TABLE_NAME_ORDINAL = 0;
+  private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+  private static final int OPTIONS_ORDINAL = 2;
+  private static final int COMPUTE_UPDATES_ORDINAL = 3;
+  private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+  private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+  private static final StructType OUTPUT_TYPE =
+      new StructType(
+          new StructField[] {
+            new StructField("changelog_view", DataTypes.StringType, false, 
Metadata.empty())
+          });
+
+  public static SparkProcedures.ProcedureBuilder builder() {
+    return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+      @Override
+      protected CreateChangelogViewProcedure doBuild() {
+        return new CreateChangelogViewProcedure(tableCatalog());
+      }
+    };
+  }
+
+  private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+    super(tableCatalog);
+  }
+
+  @Override
+  public ProcedureParameter[] parameters() {
+    return PARAMETERS;
+  }
+
+  @Override
+  public StructType outputType() {
+    return OUTPUT_TYPE;
+  }
+
+  @Override
+  public InternalRow[] call(InternalRow args) {
+    Identifier tableIdent =
+        toIdentifier(args.getString(TABLE_NAME_ORDINAL), 
PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+    // load insert and deletes from the changelog table
+    Identifier changelogTableIdent = changelogTableIdent(tableIdent);
+    Dataset<Row> df = loadDataSetFromTable(changelogTableIdent, options(args));
+
+    if (shouldComputeUpdateImages(args)) {
+      df = computeUpdateImages(identifierColumns(args, tableIdent), df);
+    } else if (shouldRemoveCarryoverRows(args)) {
+      df = removeCarryoverRows(df);
+    }
+
+    String viewName = viewName(args, tableIdent.name());
+
+    df.createOrReplaceTempView(viewName);
+
+    return toOutputRows(viewName);
+  }
+
+  private Dataset<Row> computeUpdateImages(String[] identifierColumns, 
Dataset<Row> df) {
+    Preconditions.checkArgument(
+        identifierColumns.length > 0,
+        "Cannot compute the update-rows because identifier columns are not 
set");
+
+    Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+    for (int i = 0; i < identifierColumns.length; i++) {
+      repartitionColumns[i] = df.col(identifierColumns[i]);
+    }
+    repartitionColumns[repartitionColumns.length - 1] =
+        df.col(MetadataColumns.CHANGE_ORDINAL.name());
+
+    return applyChangelogIterator(df, repartitionColumns);
+  }
+
+  private boolean shouldComputeUpdateImages(InternalRow args) {
+    if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+      return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+    } else {
+      // If the identifier columns are set, we compute pre/post update images 
by default.
+      return !args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL);
+    }
+  }
+
+  private boolean shouldRemoveCarryoverRows(InternalRow args) {
+    if (args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)) {
+      return true;
+    } else {
+      return args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+    }
+  }
+
+  private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+    Column[] repartitionColumns =
+        Arrays.stream(df.columns())
+            .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+            .map(df::col)
+            .toArray(Column[]::new);
+    return applyChangelogIterator(df, repartitionColumns);
+  }
+
+  private String[] identifierColumns(InternalRow args, Identifier tableIdent) {
+    if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+      return Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+          .map(column -> column.toString())
+          .toArray(String[]::new);
+    } else {
+      Table table = loadSparkTable(tableIdent).table();
+      return table.schema().identifierFieldNames().toArray(new String[0]);
+    }
+  }
+
+  private Identifier changelogTableIdent(Identifier tableIdent) {
+    List<String> namespace = Lists.newArrayList();
+    namespace.addAll(Arrays.asList(tableIdent.namespace()));
+    namespace.add(tableIdent.name());
+    return Identifier.of(namespace.toArray(new String[0]), 
SparkChangelogTable.TABLE_NAME);
+  }
+
+  private Map<String, String> options(InternalRow args) {
+    Map<String, String> options = Maps.newHashMap();
+
+    if (!args.isNullAt(OPTIONS_ORDINAL)) {
+      args.getMap(OPTIONS_ORDINAL)
+          .foreach(
+              DataTypes.StringType,
+              DataTypes.StringType,
+              (k, v) -> {
+                options.put(k.toString(), v.toString());
+                return BoxedUnit.UNIT;
+              });
+    }
+
+    return options;
+  }
+
+  private static String viewName(InternalRow args, String tableName) {
+    if (args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)) {
+      return String.format("`%s_changes`", tableName);
+    } else {
+      return args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+    }
+  }
+
+  private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[] 
repartitionColumns) {
+    Column[] sortSpec = sortSpec(df, repartitionColumns);
+    StructType schema = df.schema();
+    String[] identifierFields =
+        
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+    return df.repartition(repartitionColumns)
+        .sortWithinPartitions(sortSpec)
+        .mapPartitions(
+            (MapPartitionsFunction<Row, Row>)
+                rowIterator -> ChangelogIterator.create(rowIterator, schema, 
identifierFields),
+            RowEncoder.apply(schema));
+  }
+
+  private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
+    Column[] sortSpec = new Column[repartitionSpec.length + 1];
+    System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
+    sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name());
+    return sortSpec;
+  }
+
+  private InternalRow[] toOutputRows(String viewName) {
+    InternalRow row = newInternalRow(UTF8String.fromString(viewName));
+    return new InternalRow[] {row};
+  }
+
+  @Override
+  public String description() {
+    return "CreateChangelogViewProcedure";
+  }
+}
diff --git 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 6d59cb876b..8ee3a95501 100644
--- 
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++ 
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
     mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
     mapBuilder.put("register_table", RegisterTableProcedure::builder);
     mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
+    mapBuilder.put("create_changelog_view", 
CreateChangelogViewProcedure::builder);
     return mapBuilder.build();
   }
 

Reply via email to