This is an automated email from the ASF dual-hosted git repository.

etudenhoefner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/main by this push:
     new 793c8d05ce Spark: Clarify schema behavior when working with branches 
(#10055)
793c8d05ce is described below

commit 793c8d05cee9e4a95ffe2b94e31bef5a617e8c85
Author: Eduard Tudenhoefner <[email protected]>
AuthorDate: Sat Mar 30 09:07:45 2024 +0100

    Spark: Clarify schema behavior when working with branches (#10055)
---
 docs/docs/branching.md                             | 91 ++++++++++++++++++++++
 docs/docs/spark-queries.md                         | 23 +++++-
 docs/docs/spark-writes.md                          |  4 +
 .../org/apache/iceberg/spark/sql/TestSelect.java   | 59 ++++++++++++++
 4 files changed, 176 insertions(+), 1 deletion(-)

diff --git a/docs/docs/branching.md b/docs/docs/branching.md
index e944e4eb3b..3379264d8a 100644
--- a/docs/docs/branching.md
+++ b/docs/docs/branching.md
@@ -113,3 +113,94 @@ Creating, querying and writing to branches and tags are 
supported in the Iceberg
 - [Spark Branch Writes](spark-writes.md#writing-to-branches)
 - [Flink Reads](flink-queries.md#reading-branches-and-tags-with-SQL)
 - [Flink Branch Writes](flink-writes.md#branch-writes)
+
+
+## Schema selection with branches and tags
+
+It is important to understand that the schema tracked for a table is valid 
across all branches.
+When working with branches, the table's schema is used as that's the schema 
being validated when writing data to a branch.
+On the other hands, querying a tag uses the snapshot's schema, which is the 
schema id that snapshot pointed to when the snapshot was created.
+
+The below examples show which schema is being used when working with branches.
+
+Create a table and insert some data:
+
+```sql
+CREATE TABLE db.table (id bigint, data string, col float);
+INSERT INTO db.table values (1, 'a', 1.0), (2, 'b', 2.0), (3, 'c', 3.0);
+SELECT * FROM db.table;
+1      a       1.0
+2      b       2.0
+3      c       3.0
+```
+
+Create a branch `test_branch` that points to the current snapshot and read 
data from the branch:
+
+```sql
+ALTER TABLE db.table CREATE BRANCH test_branch;
+
+SELECT * FROM db.table.branch_test_branch;
+1      a       1.0
+2      b       2.0
+3      c       3.0
+```
+
+Modify the table's schema by dropping the `col` column and adding a new column 
named `new_col`:
+
+```sql
+ALTER TABLE db.table drop column float;
+
+ALTER TABLE db.table add column new_col date;
+
+INSERT INTO db.table values (4, 'd', date('2024-04-04')), (5, 'e', 
date('2024-05-05'));
+
+SELECT * FROM db.table;
+1      a       NULL
+2      b       NULL
+3      c       NULL
+4      d       2024-04-04
+5      e       2024-05-05
+```
+
+Querying the head of the branch using one of the below statements will return 
data using the **table's schema**:
+
+```sql
+SELECT * FROM db.table.branch_test_branch;
+1      a       NULL
+2      b       NULL
+3      c       NULL
+
+SELECT * FROM db.table VERSION AS OF 'test_branch';
+1      a       NULL
+2      b       NULL
+3      c       NULL
+```
+
+Performing a time travel query using the snapshot id uses the **snapshot's 
schema**:
+
+```sql
+
+SELECT * FROM db.table.refs;
+test_branch    BRANCH  8109744798576441359     NULL    NULL    NULL
+main           BRANCH  6910357365743665710     NULL    NULL    NULL
+
+
+SELECT * FROM db.table VERSION AS OF 8109744798576441359;
+1      a       1.0
+2      b       2.0
+3      c       3.0
+```
+
+When writing to the branch, the **table's schema** is used for validation:
+
+```sql
+
+INSERT INTO db.table.branch_test_branch values (6, 'e', date('2024-06-06')), 
(7, 'g', date('2024-07-07'));
+
+SELECT * FROM db.table.branch_test_branch;
+6      e       2024-06-06
+7      g       2024-07-07
+1      a       NULL
+2      b       NULL
+3      c       NULL
+```
diff --git a/docs/docs/spark-queries.md b/docs/docs/spark-queries.md
index 536c136d7e..b606d849a6 100644
--- a/docs/docs/spark-queries.md
+++ b/docs/docs/spark-queries.md
@@ -82,7 +82,7 @@ The `VERSION AS OF` clause can contain a long snapshot ID or 
a string branch or
     If this is not desired, rename the tag or branch with a well-defined 
prefix such as 'snapshot-1'.
 
 
-```sql 
+```sql
 -- time travel to October 26, 1986 at 01:21:00
 SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
 
@@ -124,6 +124,27 @@ SELECT * FROM prod.db.table.`tag_historical-snapshot`;
 
 Note that the identifier with branch or tag may not be used in combination 
with `VERSION AS OF`.
 
+
+#### Schema selection in time travel queries
+
+The different time travel queries mentioned in the previous section can use 
either the snapshot's schema or the table's schema:
+
+```sql
+-- time travel to October 26, 1986 at 01:21:00 -> uses the snapshot's schema
+SELECT * FROM prod.db.table TIMESTAMP AS OF '1986-10-26 01:21:00';
+
+-- time travel to snapshot with id 10963874102873L -> uses the snapshot's 
schema
+SELECT * FROM prod.db.table VERSION AS OF 10963874102873;
+
+-- time travel to the head of audit-branch -> uses the table's schema
+SELECT * FROM prod.db.table VERSION AS OF 'audit-branch';
+SELECT * FROM prod.db.table.`branch_audit-branch`;
+
+-- time travel to the snapshot referenced by the tag historical-snapshot -> 
uses the snapshot's schema
+SELECT * FROM prod.db.table VERSION AS OF 'historical-snapshot';
+SELECT * FROM prod.db.table.`tag_historical-snapshot`;
+```
+
 #### DataFrame
 
 To select a specific table snapshot or the snapshot at some time in the 
DataFrame API, Iceberg supports four Spark read options:
diff --git a/docs/docs/spark-writes.md b/docs/docs/spark-writes.md
index 626dee6c96..96fcc5f7ce 100644
--- a/docs/docs/spark-writes.md
+++ b/docs/docs/spark-writes.md
@@ -201,6 +201,10 @@ Note WAP branch and branch identifier cannot both be 
specified.
 Also, the branch must exist before performing the write. 
 The operation does **not** create the branch if it does not exist. 
 For more information on branches please refer to [branches](branching.md).
+
+!!! info
+    Note: When writing to a branch, the current schema of the table will be 
used for validation.
+
  
 ```sql
 -- INSERT (1,' a') (2, 'b') into the audit branch.
diff --git 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
index 80d7d8787e..4c99a38d29 100644
--- 
a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
+++ 
b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/sql/TestSelect.java
@@ -23,6 +23,7 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.assertj.core.api.Assumptions.assumeThat;
 
 import java.text.SimpleDateFormat;
+import java.util.Arrays;
 import java.util.Date;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
@@ -348,6 +349,64 @@ public class TestSelect extends CatalogTestBase {
     assertEquals("Snapshot at specific branch reference name", expected, 
fromDF);
   }
 
+  @TestTemplate
+  public void readAndWriteWithBranchAfterSchemaChange() {
+    Table table = validationCatalog.loadTable(tableIdent);
+    String branchName = "test_branch";
+    table.manageSnapshots().createBranch(branchName, 
table.currentSnapshot().snapshotId()).commit();
+
+    List<Object[]> expected =
+        Arrays.asList(row(1L, "a", 1.0f), row(2L, "b", 2.0f), row(3L, "c", 
Float.NaN));
+    assertThat(sql("SELECT * FROM %s", 
tableName)).containsExactlyElementsOf(expected);
+
+    // change schema on the table and add more data
+    sql("ALTER TABLE %s DROP COLUMN float", tableName);
+    sql("ALTER TABLE %s ADD COLUMN new_col date", tableName);
+    sql(
+        "INSERT INTO %s VALUES (4, 'd', date('2024-04-04')), (5, 'e', 
date('2024-05-05'))",
+        tableName);
+
+    // time-travel query using snapshot id should return the snapshot's schema
+    long branchSnapshotId = table.refs().get(branchName).snapshotId();
+    assertThat(sql("SELECT * FROM %s VERSION AS OF %s", tableName, 
branchSnapshotId))
+        .containsExactlyElementsOf(expected);
+
+    // querying the head of the branch should return the table's schema
+    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, 
branchName))
+        .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, "c", 
null));
+
+    if (!"spark_catalog".equals(catalogName)) {
+      // querying the head of the branch using 'branch_' should return the 
table's schema
+      assertThat(sql("SELECT * FROM %s.branch_%s", tableName, branchName))
+          .containsExactly(row(1L, "a", null), row(2L, "b", null), row(3L, 
"c", null));
+    }
+
+    // writing to a branch uses the table's schema
+    sql(
+        "INSERT INTO %s.branch_%s VALUES (6L, 'f', cast('2023-06-06' as 
date)), (7L, 'g', cast('2023-07-07' as date))",
+        tableName, branchName);
+
+    // querying the head of the branch returns the table's schema
+    assertThat(sql("SELECT * FROM %s VERSION AS OF '%s'", tableName, 
branchName))
+        .containsExactlyInAnyOrder(
+            row(1L, "a", null),
+            row(2L, "b", null),
+            row(3L, "c", null),
+            row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
+            row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
+
+    // using DataFrameReader with the 'branch' option should return the 
table's schema
+    Dataset<Row> df =
+        spark.read().format("iceberg").option(SparkReadOptions.BRANCH, 
branchName).load(tableName);
+    assertThat(rowsToJava(df.collectAsList()))
+        .containsExactlyInAnyOrder(
+            row(1L, "a", null),
+            row(2L, "b", null),
+            row(3L, "c", null),
+            row(6L, "f", java.sql.Date.valueOf("2023-06-06")),
+            row(7L, "g", java.sql.Date.valueOf("2023-07-07")));
+  }
+
   @TestTemplate
   public void testUnknownReferenceAsOf() {
     assertThatThrownBy(() -> sql("SELECT * FROM %s VERSION AS OF 
'test_unknown'", tableName))

Reply via email to