This is an automated email from the ASF dual-hosted git repository.
etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/main by this push:
new 793c8d05ce Spark: Clarify schema behavior when working with branches
(#10055)
793c8d05ce is described below
commit 793c8d05cee9e4a95ffe2b94e31bef5a617e8c85
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sat Mar 30 09:07:45 2024 +0100
Spark: Clarify schema behavior when working with branches (#10055)
---
docs/docs/branching.md | 91 ++++++++++++++++++++++
docs/docs/spark-queries.md | 23 +++++-
docs/docs/spark-writes.md | 4 +
.../org/apache/iceberg/spark/sql/TestSelect.java | 59 ++++++++++++++
4 files changed, 176 insertions(+), 1 deletion(-)
diff --git a/docs/docs/branching.md b/docs/docs/branching.md
index e944e4eb3b..3379264d8a 100644
--- a/docs/docs/branching.md
+++ b/docs/docs/branching.md
@@ -113,3 +113,94 @@ Creating, querying and writing to branches and tags are
supported in the Iceberg
- [Spark Branch Writes](spark-writes.md#writing-to-branches)
- [Flink Reads](flink-queries.md#reading-branches-and-tags-with-SQL)
- [Flink Branch Writes](flink-writes.md#branch-writes)
+
+
+## Schema selection with branches and tags
+
+It is important to understand that the schema tracked for a table is valid
across all branches.
+When working with branches, the table's schema is used as that's the schema
being validated when writing data to a branch.
+On the other hands, querying a tag uses the snapshot's schema, which is the
schema id that snapshot pointed to when the snapshot was created.
+
+The below examples show which schema is being used when working with branches.
+
+Create a table and insert some data:
+
+```sql
+CREATE TABLE db.table (id bigint, data string, col float);
+INSERT INTO db.table values (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 3.0);
+SELECT * FROM db.table;
+1 a 1.0
+2 b 2.0
+3 c 3.0
+```
+
+Create a branch `test_branch` that points to the current snapshot and read
data from the branch:
+
+```sql
+ALTER TABLE db.table CREATE BRANCH test_branch;
+
+SELECT * FROM db.table.branch_test_branch;
+1 a 1.0
+2 b 2.0
+3 c 3.0
+```
+
+Modify the table's schema by dropping the `col` column and adding a new column
named `new_col`:
+
+```sql
+ALTER TABLE db.table drop column float;
+
+ALTER TABLE db.table add column new_col date;
+
+INSERT INTO db.table values (4, 'd', date('2024-04-04')), (5, 'e',
date('2024-05-05'));
+
+SELECT * FROM db.table;
+1 a NULL
+2 b NULL
+3 c NULL
+4 d 2024-04-04
+5 e 2024-05-05
+```
+
+Querying the head of the branch using one of the below statements will return
data using the **table's schema**:
+
+```sql
+SELECT * FROM db.table.branch_test_branch;
+1 a NULL
+2 b NULL
+3 c NULL
+
+SELECT * FROM db.table VERSION AS OF 'test_branch';
+1 a NULL
+2 b NULL
+3 c NULL
+```
+
+Performing a time travel query using the snapshot id uses the **snapshot's
schema**:
+
+```sql
+
+SELECT * FROM db.table.refs;
+test_branch BRANCH 8109744798576441359 NULL NULL NULL
+main BRANCH 6910357365743665710 NULL NULL NULL
+
+
+SELECT * FROM db.table VERSION AS OF 8109744798576441359;
+1 a 1.0
+2 b 2.0
+3 c 3.0
+```
+
+When writing to the branch, the **table's schema** is used for validation:
+
+```sql
+
+INSERT INTO db.table.branch_test_branch values (6, 'e', date('2024-06-06')),
(7, 'g', date('2024-07-07'));
+
+SELECT * FROM db.table.branch_test_branch;
+6 e 2024-06-06
+7 g 2024-07-07
+1 a NULL
+2 b NULL
+3 c NULL
+```
diff --git a/docs/docs/spark-queries.md b/docs/docs/spark-queries.md
index 536c136d7e..b606d849a6 100644
--- a/docs/docs/spark-queries.md
+++ b/docs/docs/spark-queries.md
@@ -82,7 +82,7 @@ The `VERSION AS OF` clause can contain a long snapshot ID or
a string branch or
If this is not desired, rename the tag or branch with a well-defined
prefix such as 'snapshot-1'.
-```sql
+```sql
-- time travel to October 26, 1986 at 01:21:00
SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
@@ -124,6 +124,27 @@ SELECT * FROM prod.db.table.`tag_historical-snapshot`;
Note that the identifier with branch or tag may not be used in combination
with `VERSION AS OF`.
+
+#### Schema selection in time travel queries
+
+The different time travel queries mentioned in the previous section can use
either the snapshot's schema or the table's schema:
+
+```sql
+-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schema
+SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
+
+-- time travel to snapshot with id 10963874102873L -> uses the snapshot's
schema
+SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
+
+-- time travel to the head of audit-branch -> uses the table's schema
+SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
+SELECT * FROM prod.db.table.`branch_audit-branch`;
+
+-- time travel to the snapshot referenced by the tag historical-snapshot ->
uses the snapshot's schema
+SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
+SELECT * FROM prod.db.table.`tag_historical-snapshot`;
+```
+
#### DataFrame
To select a specific table snapshot or the snapshot at some time in the
DataFrame API, Iceberg supports four Spark read options:
diff --git a/docs/docs/spark-writes.md b/docs/docs/spark-writes.md
index 626dee6c96..96fcc5f7ce 100644
--- a/docs/docs/spark-writes.md
+++ b/docs/docs/spark-writes.md
@@ -201,6 +201,10 @@ Note WAP branch and branch identifier cannot both be
specified.
Also, the branch must exist before performing the write.
The operation does **not** create the branch if it does not exist.
For more information on branches please refer to [branches](branching.md).
+
+!!! info
+ Note: When writing to a branch, the current schema of the table will be
used for validation.
+
```sql
-- INSERT (1,' a') (2, 'b') into the audit branch.
diff --git
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
index 80d7d8787e..4c99a38d29 100644
---
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
+++
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -23,6 +23,7 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assumptions.assumeThat;
import java.text.SimpleDateFormat;
+import java.util.Arrays;
import java.util.Date;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -348,6 +349,64 @@ public class TestSelect extends CatalogTestBase {
assertEquals("Snapshot at specific branch reference name", expected,
fromDF);
}
+ @TestTemplate
+ public void readAndWriteWithBranchAfterSchemaChange() {
+ Table table = validationCatalog.loadTable(tableIdent);
+ String branchName = "test_branch";
+ table.manageSnapshots().createBranch(branchName,
table.currentSnapshot().snapshotId()).commit();
+
+ List<Object[]> expected =
+ Arrays.asList(row(1L, "a", 1.0f), row(2L, "b", 2.0f), row(3L, "c",
Float.NaN));
+ assertThat(sql("SELECT * FROM %s",
tableName)).containsExactlyElementsOf(expected);
+
+ // change schema on the table and add more data
+ sql("ALTER TABLE %s DROP COLUMN float", tableName);
+ sql("ALTER TABLE %s ADD COLUMN new_col date", tableName);
+ sql(
+ "INSERT INTO %s VALUES (4, 'd', date('2024-04-04')), (5, 'e',
date('2024-05-05'))",
+ tableName);
+
+ // time-travel query using snapshot id should return the snapshot's schema
+ long branchSnapshotId = table.refs().get(branchName).snapshotId();
+ assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName,
branchSnapshotId))
+ .containsExactlyElementsOf(expected);
+
+ // querying the head of the branch should return the table's schema
+ assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName,
branchName))
+ .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c",
null));
+
+ if (!"spark_catalog".equals(catalogName)) {
+ // querying the head of the branch using 'branch_' should return the
table's schema
+ assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName))
+ .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L,
"c", null));
+ }
+
+ // writing to a branch uses the table's schema
+ sql(
+ "INSERT INTO %s.branch_%s VALUES (6L, 'f', cast('2023-06-06' as
date)), (7L, 'g', cast('2023-07-07' as date))",
+ tableName, branchName);
+
+ // querying the head of the branch returns the table's schema
+ assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName,
branchName))
+ .containsExactlyInAnyOrder(
+ row(1L, "a", null),
+ row(2L, "b", null),
+ row(3L, "c", null),
+ row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
+ row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
+
+ // using DataFrameReader with the 'branch' option should return the
table's schema
+ Dataset<Row> df =
+ spark.read().format("iceberg").option(SparkReadOptions.BRANCH,
branchName).load(tableName);
+ assertThat(rowsToJava(df.collectAsList()))
+ .containsExactlyInAnyOrder(
+ row(1L, "a", null),
+ row(2L, "b", null),
+ row(3L, "c", null),
+ row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
+ row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
+ }
+
@TestTemplate
public void testUnknownReferenceAsOf() {
assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF
'test_unknown'", tableName))