This is an automated email from the ASF dual-hosted git repository.
aokolnychyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 3b37f58137 Spark 3.2: Add a procedure to create changelog view (#7036)
3b37f58137 is described below
commit 3b37f581371c1fd70d2a56f5bd900ec7a8436e7e
Author: Yufei Gu <[email protected]>
AuthorDate: Tue Mar 7 20:51:54 2023 -0800
Spark 3.2: Add a procedure to create changelog view (#7036)
This change backports PR #6012 to Spark 3.2.
---
.../TestCreateChangelogViewProcedure.java | 450 +++++++++++++++++++++
.../iceberg/spark/procedures/BaseProcedure.java | 10 +
.../procedures/CreateChangelogViewProcedure.java | 266 ++++++++++++
.../iceberg/spark/procedures/SparkProcedures.java | 1 +
4 files changed, 727 insertions(+)
diff --git
a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
new file mode 100644
index 0000000000..dc12b0145d
--- /dev/null
+++
b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestCreateChangelogViewProcedure.java
@@ -0,0 +1,450 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.ChangelogOperation;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.spark.SparkReadOptions;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCreateChangelogViewProcedure extends SparkExtensionsTestBase {
+ private static final String DELETE = ChangelogOperation.DELETE.name();
+ private static final String INSERT = ChangelogOperation.INSERT.name();
+ private static final String UPDATE_BEFORE =
ChangelogOperation.UPDATE_BEFORE.name();
+ private static final String UPDATE_AFTER =
ChangelogOperation.UPDATE_AFTER.name();
+
+ public TestCreateChangelogViewProcedure(
+ String catalogName, String implementation, Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTable() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ public void createTableWith2Columns() {
+ sql("CREATE TABLE %s (id INT, data STRING) USING iceberg", tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName,
1);
+ sql("ALTER TABLE %s ADD PARTITION FIELD data", tableName);
+ }
+
+ private void createTableWith3Columns() {
+ sql("CREATE TABLE %s (id INT, data STRING, age INT) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName,
1);
+ sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+ }
+
+ private void createTableWithIdentifierField() {
+ sql("CREATE TABLE %s (id INT NOT NULL, data STRING) USING iceberg",
tableName);
+ sql("ALTER TABLE %s SET TBLPROPERTIES ('format-version'='%d')", tableName,
1);
+ sql("ALTER TABLE %s SET IDENTIFIER FIELDS id", tableName);
+ }
+
+ @Test
+ public void testCustomizedViewName() {
+ createTableWith2Columns();
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+
+ Table table = validationCatalog.loadTable(tableIdent);
+
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+
+ table.refresh();
+
+ Snapshot snap2 = table.currentSnapshot();
+
+ sql(
+ "CALL %s.system.create_changelog_view("
+ + "table => '%s',"
+ + "options => map('%s','%s','%s','%s'),"
+ + "changelog_view => '%s')",
+ catalogName,
+ tableName,
+ SparkReadOptions.START_SNAPSHOT_ID,
+ snap1.snapshotId(),
+ SparkReadOptions.END_SNAPSHOT_ID,
+ snap2.snapshotId(),
+ "cdc_view");
+
+ long rowCount = sql("select * from %s", "cdc_view").stream().count();
+ Assert.assertEquals(2, rowCount);
+ }
+
+ @Test
+ public void testNoSnapshotIdInput() {
+ createTableWith2Columns();
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap0 = table.currentSnapshot();
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(" + "table => '%s')",
+ catalogName, tableName, "cdc_view");
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", INSERT, 0, snap0.snapshotId()),
+ row(2, "b", INSERT, 1, snap1.snapshotId()),
+ row(-2, "b", INSERT, 2, snap2.snapshotId()),
+ row(2, "b", DELETE, 2, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id", viewName));
+ }
+
+ @Test
+ public void testTimestampsBasedQuery() {
+ createTableWith2Columns();
+ long beginning = System.currentTimeMillis();
+
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap0 = table.currentSnapshot();
+ long afterFirstInsert = waitUntilAfter(snap0.timestampMillis());
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ long afterInsertOverwrite = waitUntilAfter(snap2.timestampMillis());
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s', "
+ + "options => map('%s', '%s','%s', '%s'))",
+ catalogName,
+ tableName,
+ SparkReadOptions.START_TIMESTAMP,
+ beginning,
+ SparkReadOptions.END_TIMESTAMP,
+ afterInsertOverwrite);
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", INSERT, 0, snap0.snapshotId()),
+ row(2, "b", INSERT, 1, snap1.snapshotId()),
+ row(-2, "b", INSERT, 2, snap2.snapshotId()),
+ row(2, "b", DELETE, 2, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id",
returns.get(0)[0]));
+
+ // query the timestamps starting from the second insert
+ returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s', "
+ + "options => map('%s', '%s', '%s', '%s'))",
+ catalogName,
+ tableName,
+ SparkReadOptions.START_TIMESTAMP,
+ afterFirstInsert,
+ SparkReadOptions.END_TIMESTAMP,
+ afterInsertOverwrite);
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(2, "b", INSERT, 0, snap1.snapshotId()),
+ row(-2, "b", INSERT, 1, snap2.snapshotId()),
+ row(2, "b", DELETE, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id",
returns.get(0)[0]));
+ }
+
+ @Test
+ public void testWithCarryovers() {
+ createTableWith2Columns();
+ sql("INSERT INTO %s VALUES (1, 'a')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap0 = table.currentSnapshot();
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (-2, 'b'), (2, 'b'), (2, 'b')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view("
+ + "remove_carryovers => false,"
+ + "table => '%s')",
+ catalogName, tableName, "cdc_view");
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", INSERT, 0, snap0.snapshotId()),
+ row(2, "b", INSERT, 1, snap1.snapshotId()),
+ row(-2, "b", INSERT, 2, snap2.snapshotId()),
+ row(2, "b", DELETE, 2, snap2.snapshotId()),
+ row(2, "b", INSERT, 2, snap2.snapshotId()),
+ row(2, "b", INSERT, 2, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, _change_type",
viewName));
+ }
+
+ @Test
+ public void testUpdate() {
+ createTableWith2Columns();
+ sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+ sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s',
identifier_columns => array('id'))",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", INSERT, 0, snap1.snapshotId()),
+ row(2, "b", INSERT, 0, snap1.snapshotId()),
+ row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+ row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+ row(3, "c", INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data", viewName));
+ }
+
+ @Test
+ public void testUpdateWithIdentifierField() {
+ createTableWithIdentifierField();
+
+ sql("INSERT INTO %s VALUES (2, 'b')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s',
compute_updates => true)",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(2, "b", INSERT, 0, snap1.snapshotId()),
+ row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+ row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId()),
+ row(3, "c", INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data", viewName));
+ }
+
+ @Test
+ public void testUpdateWithFilter() {
+ createTableWith2Columns();
+ sql("ALTER TABLE %s DROP PARTITION FIELD data", tableName);
+ sql("ALTER TABLE %s ADD PARTITION FIELD id", tableName);
+
+ sql("INSERT INTO %s VALUES (1, 'a'), (2, 'b')", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (3, 'c'), (2, 'd')", tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view(table => '%s',
identifier_columns => array('id'))",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", INSERT, 0, snap1.snapshotId()),
+ row(2, "b", INSERT, 0, snap1.snapshotId()),
+ row(2, "b", UPDATE_BEFORE, 1, snap2.snapshotId()),
+ row(2, "d", UPDATE_AFTER, 1, snap2.snapshotId())),
+ // the predicate on partition columns will filter out the insert of
(3, 'c') at the planning
+ // phase
+ sql("select * from %s where id != 3 order by _change_ordinal, id,
data", viewName));
+ }
+
+ @Test
+ public void testUpdateWithMultipleIdentifierColumns() {
+ createTableWith3Columns();
+
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11)", tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view("
+ + "identifier_columns => array('id','age'),"
+ + "table => '%s')",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+ row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+ row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+ row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data", viewName));
+ }
+
+ @Test
+ public void testRemoveCarryOvers() {
+ createTableWith3Columns();
+
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ // carry-over row (2, 'e', 12)
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view("
+ + "identifier_columns => array('id','age'), "
+ + "table => '%s')",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+ // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT',
1) are removed
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+ row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, UPDATE_BEFORE, 1, snap2.snapshotId()),
+ row(2, "d", 11, UPDATE_AFTER, 1, snap2.snapshotId()),
+ row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data", viewName));
+ }
+
+ @Test
+ public void testRemoveCarryOversWithoutUpdatedRows() {
+ createTableWith3Columns();
+
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ // carry-over row (2, 'e', 12)
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql("CALL %s.system.create_changelog_view(table => '%s')",
catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+
+ // the carry-over rows (2, 'e', 12, 'DELETE', 1), (2, 'e', 12, 'INSERT',
1) are removed, even
+ // though update-row is not computed
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+ row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+ row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+ row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data", viewName));
+ }
+
+ @Test
+ public void testNotRemoveCarryOvers() {
+ createTableWith3Columns();
+
+ sql("INSERT INTO %s VALUES (1, 'a', 12), (2, 'b', 11), (2, 'e', 12)",
tableName);
+ Table table = validationCatalog.loadTable(tableIdent);
+ Snapshot snap1 = table.currentSnapshot();
+
+ // carry-over row (2, 'e', 12)
+ sql("INSERT OVERWRITE %s VALUES (3, 'c', 13), (2, 'd', 11), (2, 'e', 12)",
tableName);
+ table.refresh();
+ Snapshot snap2 = table.currentSnapshot();
+
+ List<Object[]> returns =
+ sql(
+ "CALL %s.system.create_changelog_view("
+ + "remove_carryovers => false,"
+ + "table => '%s')",
+ catalogName, tableName);
+
+ String viewName = (String) returns.get(0)[0];
+
+ assertEquals(
+ "Rows should match",
+ ImmutableList.of(
+ row(1, "a", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, INSERT, 0, snap1.snapshotId()),
+ row(2, "e", 12, INSERT, 0, snap1.snapshotId()),
+ row(2, "b", 11, DELETE, 1, snap2.snapshotId()),
+ row(2, "d", 11, INSERT, 1, snap2.snapshotId()),
+ // the following two rows are carry-over rows
+ row(2, "e", 12, DELETE, 1, snap2.snapshotId()),
+ row(2, "e", 12, INSERT, 1, snap2.snapshotId()),
+ row(3, "c", 13, INSERT, 1, snap2.snapshotId())),
+ sql("select * from %s order by _change_ordinal, id, data,
_change_type", viewName));
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
index 86364dc262..1babe4efae 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/BaseProcedure.java
@@ -18,6 +18,7 @@
*/
package org.apache.iceberg.spark.procedures;
+import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
@@ -31,6 +32,8 @@ import
org.apache.iceberg.spark.Spark3Util.CatalogAndIdentifier;
import org.apache.iceberg.spark.actions.SparkActions;
import org.apache.iceberg.spark.procedures.SparkProcedures.ProcedureBuilder;
import org.apache.iceberg.spark.source.SparkTable;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException;
@@ -49,6 +52,7 @@ import scala.Option;
abstract class BaseProcedure implements Procedure {
protected static final DataType STRING_MAP =
DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType);
+ protected static final DataType STRING_ARRAY =
DataTypes.createArrayType(DataTypes.StringType);
private final SparkSession spark;
private final TableCatalog tableCatalog;
@@ -144,6 +148,12 @@ abstract class BaseProcedure implements Procedure {
}
}
+ protected Dataset<Row> loadDataSetFromTable(Identifier tableIdent,
Map<String, String> options) {
+ String tableName = Spark3Util.quotedFullIdentifier(tableCatalog().name(),
tableIdent);
+ // no need to validate the read options here since the reader will
validate them
+ return spark().read().options(options).table(tableName);
+ }
+
protected void refreshSparkCache(Identifier ident, Table table) {
CacheManager cacheManager = spark.sharedState().cacheManager();
DataSourceV2Relation relation =
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
new file mode 100644
index 0000000000..ab844e08e9
--- /dev/null
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/CreateChangelogViewProcedure.java
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.procedures;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import org.apache.iceberg.MetadataColumns;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.spark.ChangelogIterator;
+import org.apache.iceberg.spark.source.SparkChangelogTable;
+import org.apache.spark.api.java.function.MapPartitionsFunction;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.encoders.RowEncoder;
+import org.apache.spark.sql.connector.catalog.Identifier;
+import org.apache.spark.sql.connector.catalog.TableCatalog;
+import org.apache.spark.sql.connector.iceberg.catalog.ProcedureParameter;
+import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.sql.types.Metadata;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.UTF8String;
+import scala.runtime.BoxedUnit;
+
+/**
+ * A procedure that creates a view for changed rows.
+ *
+ * <p>The procedure removes the carry-over rows by default. If you want to
keep them, you can set
+ * "remove_carryovers" to be false in the options.
+ *
+ * <p>The procedure doesn't compute the pre/post update images by default. If
you want to compute
+ * them, you can set "compute_updates" to be true in the options.
+ *
+ * <p>Carry-over rows are the result of a removal and insertion of the same
row within an operation
+ * because of the copy-on-write mechanism. For example, given a file which
contains row1 (id=1,
+ * data='a') and row2 (id=2, data='b'). A copy-on-write delete of row2 would
require erasing this
+ * file and preserving row1 in a new file. The changelog table would report
this as (id=1, data='a',
+ * op='DELETE') and (id=1, data='a', op='INSERT'), despite it not being an
actual change to the
+ * table. The procedure finds the carry-over rows and removes them from the
result.
+ *
+ * <p>Pre/post update images are converted from a pair of a delete row and an
insert row. Identifier
+ * columns are used for determining whether an insert and a delete record
refer to the same row. If
+ * the two records share the same values for the identity columns they are
considered to be before
+ * and after states of the same row. You can either set identifier fields in
the table schema or
+ * input them as the procedure parameters. Here is an example of pre/post
update images with an
+ * identifier column(id). A pair of a delete row and an insert row with the
same id:
+ *
+ * <ul>
+ * <li>(id=1, data='a', op='DELETE')
+ * <li>(id=1, data='b', op='INSERT')
+ * </ul>
+ *
+ * <p>will be marked as pre/post update images:
+ *
+ * <ul>
+ * <li>(id=1, data='a', op='UPDATE_BEFORE')
+ * <li>(id=1, data='b', op='UPDATE_AFTER')
+ * </ul>
+ */
+public class CreateChangelogViewProcedure extends BaseProcedure {
+
+ private static final ProcedureParameter[] PARAMETERS =
+ new ProcedureParameter[] {
+ ProcedureParameter.required("table", DataTypes.StringType),
+ ProcedureParameter.optional("changelog_view", DataTypes.StringType),
+ ProcedureParameter.optional("options", STRING_MAP),
+ ProcedureParameter.optional("compute_updates", DataTypes.BooleanType),
+ ProcedureParameter.optional("remove_carryovers",
DataTypes.BooleanType),
+ ProcedureParameter.optional("identifier_columns", STRING_ARRAY),
+ };
+
+ private static final int TABLE_NAME_ORDINAL = 0;
+ private static final int CHANGELOG_VIEW_NAME_ORDINAL = 1;
+ private static final int OPTIONS_ORDINAL = 2;
+ private static final int COMPUTE_UPDATES_ORDINAL = 3;
+ private static final int REMOVE_CARRYOVERS_ORDINAL = 4;
+ private static final int IDENTIFIER_COLUMNS_ORDINAL = 5;
+
+ private static final StructType OUTPUT_TYPE =
+ new StructType(
+ new StructField[] {
+ new StructField("changelog_view", DataTypes.StringType, false,
Metadata.empty())
+ });
+
+ public static SparkProcedures.ProcedureBuilder builder() {
+ return new BaseProcedure.Builder<CreateChangelogViewProcedure>() {
+ @Override
+ protected CreateChangelogViewProcedure doBuild() {
+ return new CreateChangelogViewProcedure(tableCatalog());
+ }
+ };
+ }
+
+ private CreateChangelogViewProcedure(TableCatalog tableCatalog) {
+ super(tableCatalog);
+ }
+
+ @Override
+ public ProcedureParameter[] parameters() {
+ return PARAMETERS;
+ }
+
+ @Override
+ public StructType outputType() {
+ return OUTPUT_TYPE;
+ }
+
+ @Override
+ public InternalRow[] call(InternalRow args) {
+ Identifier tableIdent =
+ toIdentifier(args.getString(TABLE_NAME_ORDINAL),
PARAMETERS[TABLE_NAME_ORDINAL].name());
+
+ // load insert and deletes from the changelog table
+ Identifier changelogTableIdent = changelogTableIdent(tableIdent);
+ Dataset<Row> df = loadDataSetFromTable(changelogTableIdent, options(args));
+
+ if (shouldComputeUpdateImages(args)) {
+ df = computeUpdateImages(identifierColumns(args, tableIdent), df);
+ } else if (shouldRemoveCarryoverRows(args)) {
+ df = removeCarryoverRows(df);
+ }
+
+ String viewName = viewName(args, tableIdent.name());
+
+ df.createOrReplaceTempView(viewName);
+
+ return toOutputRows(viewName);
+ }
+
+ private Dataset<Row> computeUpdateImages(String[] identifierColumns,
Dataset<Row> df) {
+ Preconditions.checkArgument(
+ identifierColumns.length > 0,
+ "Cannot compute the update-rows because identifier columns are not
set");
+
+ Column[] repartitionColumns = new Column[identifierColumns.length + 1];
+ for (int i = 0; i < identifierColumns.length; i++) {
+ repartitionColumns[i] = df.col(identifierColumns[i]);
+ }
+ repartitionColumns[repartitionColumns.length - 1] =
+ df.col(MetadataColumns.CHANGE_ORDINAL.name());
+
+ return applyChangelogIterator(df, repartitionColumns);
+ }
+
+ private boolean shouldComputeUpdateImages(InternalRow args) {
+ if (!args.isNullAt(COMPUTE_UPDATES_ORDINAL)) {
+ return args.getBoolean(COMPUTE_UPDATES_ORDINAL);
+ } else {
+ // If the identifier columns are set, we compute pre/post update images
by default.
+ return !args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL);
+ }
+ }
+
+ private boolean shouldRemoveCarryoverRows(InternalRow args) {
+ if (args.isNullAt(REMOVE_CARRYOVERS_ORDINAL)) {
+ return true;
+ } else {
+ return args.getBoolean(REMOVE_CARRYOVERS_ORDINAL);
+ }
+ }
+
+ private Dataset<Row> removeCarryoverRows(Dataset<Row> df) {
+ Column[] repartitionColumns =
+ Arrays.stream(df.columns())
+ .filter(c -> !c.equals(MetadataColumns.CHANGE_TYPE.name()))
+ .map(df::col)
+ .toArray(Column[]::new);
+ return applyChangelogIterator(df, repartitionColumns);
+ }
+
+ private String[] identifierColumns(InternalRow args, Identifier tableIdent) {
+ if (!args.isNullAt(IDENTIFIER_COLUMNS_ORDINAL)) {
+ return Arrays.stream(args.getArray(IDENTIFIER_COLUMNS_ORDINAL).array())
+ .map(column -> column.toString())
+ .toArray(String[]::new);
+ } else {
+ Table table = loadSparkTable(tableIdent).table();
+ return table.schema().identifierFieldNames().toArray(new String[0]);
+ }
+ }
+
+ private Identifier changelogTableIdent(Identifier tableIdent) {
+ List<String> namespace = Lists.newArrayList();
+ namespace.addAll(Arrays.asList(tableIdent.namespace()));
+ namespace.add(tableIdent.name());
+ return Identifier.of(namespace.toArray(new String[0]),
SparkChangelogTable.TABLE_NAME);
+ }
+
+ private Map<String, String> options(InternalRow args) {
+ Map<String, String> options = Maps.newHashMap();
+
+ if (!args.isNullAt(OPTIONS_ORDINAL)) {
+ args.getMap(OPTIONS_ORDINAL)
+ .foreach(
+ DataTypes.StringType,
+ DataTypes.StringType,
+ (k, v) -> {
+ options.put(k.toString(), v.toString());
+ return BoxedUnit.UNIT;
+ });
+ }
+
+ return options;
+ }
+
+ private static String viewName(InternalRow args, String tableName) {
+ if (args.isNullAt(CHANGELOG_VIEW_NAME_ORDINAL)) {
+ return String.format("`%s_changes`", tableName);
+ } else {
+ return args.getString(CHANGELOG_VIEW_NAME_ORDINAL);
+ }
+ }
+
+ private Dataset<Row> applyChangelogIterator(Dataset<Row> df, Column[]
repartitionColumns) {
+ Column[] sortSpec = sortSpec(df, repartitionColumns);
+ StructType schema = df.schema();
+ String[] identifierFields =
+
Arrays.stream(repartitionColumns).map(Column::toString).toArray(String[]::new);
+
+ return df.repartition(repartitionColumns)
+ .sortWithinPartitions(sortSpec)
+ .mapPartitions(
+ (MapPartitionsFunction<Row, Row>)
+ rowIterator -> ChangelogIterator.create(rowIterator, schema,
identifierFields),
+ RowEncoder.apply(schema));
+ }
+
+ private static Column[] sortSpec(Dataset<Row> df, Column[] repartitionSpec) {
+ Column[] sortSpec = new Column[repartitionSpec.length + 1];
+ System.arraycopy(repartitionSpec, 0, sortSpec, 0, repartitionSpec.length);
+ sortSpec[sortSpec.length - 1] = df.col(MetadataColumns.CHANGE_TYPE.name());
+ return sortSpec;
+ }
+
+ private InternalRow[] toOutputRows(String viewName) {
+ InternalRow row = newInternalRow(UTF8String.fromString(viewName));
+ return new InternalRow[] {row};
+ }
+
+ @Override
+ public String description() {
+ return "CreateChangelogViewProcedure";
+ }
+}
diff --git
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
index 6d59cb876b..8ee3a95501 100644
---
a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
+++
b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/procedures/SparkProcedures.java
@@ -53,6 +53,7 @@ public class SparkProcedures {
mapBuilder.put("ancestors_of", AncestorsOfProcedure::builder);
mapBuilder.put("register_table", RegisterTableProcedure::builder);
mapBuilder.put("publish_changes", PublishChangesProcedure::builder);
+ mapBuilder.put("create_changelog_view",
CreateChangelogViewProcedure::builder);
return mapBuilder.build();
}