This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 12bcffbebf Spark 3.3: Bug Fix for Metadata table Time Travel with 
Schema Evolution (#6980)
12bcffbebf is described below

commit 12bcffbebf98555449435a8aa5408435ffbc4c16
Author: Sung Yun <[email protected]>
AuthorDate: Sun Mar 5 20:00:03 2023 -0500

    Spark 3.3: Bug Fix for Metadata table Time Travel with Schema Evolution 
(#6980)
---
 .../spark/extensions/TestMetadataTables.java       | 69 ++++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |  7 ++-
 2 files changed, 75 insertions(+), 1 deletion(-)

diff --git 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index a60e0f5d93..2143916384 100644
--- 
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++ 
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -18,6 +18,8 @@
  */
 package org.apache.iceberg.spark.extensions;
 
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
 import java.io.IOException;
 import java.util.Comparator;
 import java.util.List;
@@ -42,10 +44,13 @@ import org.apache.iceberg.spark.Spark3Util;
 import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.TestHelpers;
 import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Encoders;
 import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.types.StructType;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
@@ -488,6 +493,70 @@ public class TestMetadataTables extends 
SparkExtensionsTestBase {
         metadataLogWithProjection);
   }
 
+  @Test
+  public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
+    // Create table and insert data
+    sql(
+        "CREATE TABLE %s (id bigint, data string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> recordsA =
+        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+    spark
+        .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    table.updateSchema().addColumn("category", 
Types.StringType.get()).commit();
+
+    List<Row> newRecords =
+        Lists.newArrayList(RowFactory.create(3, "b", "c"), 
RowFactory.create(4, "b", "c"));
+
+    StructType newSparkSchema =
+        SparkSchemaUtil.convert(
+            new Schema(
+                optional(1, "id", Types.IntegerType.get()),
+                optional(2, "data", Types.StringType.get()),
+                optional(3, "category", Types.StringType.get())));
+
+    spark.createDataFrame(newRecords, 
newSparkSchema).coalesce(1).writeTo(tableName).append();
+
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+    Dataset<Row> actualFilesDs =
+        spark.sql(
+            "SELECT * FROM "
+                + tableName
+                + ".files VERSION AS OF "
+                + currentSnapshotId
+                + " ORDER BY content");
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+    Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".entries").schema();
+    List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
+    List<Record> expectedFiles =
+        expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
+
+    Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
+
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
+
+    Assert.assertEquals(
+        "expectedFiles and actualFiles size should be the same",
+        actualFiles.size(),
+        expectedFiles.size());
+  }
+
   @Test
   public void testSnapshotReferencesMetatable() throws Exception {
     // Create table and insert data
diff --git 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 004be8b959..c5e367de03 100644
--- 
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -24,6 +24,7 @@ import static 
org.apache.iceberg.TableProperties.FORMAT_VERSION;
 import java.io.IOException;
 import java.util.Map;
 import java.util.Set;
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.BaseTable;
 import org.apache.iceberg.DataFile;
 import org.apache.iceberg.FileScanTask;
@@ -159,7 +160,11 @@ public class SparkTable
   }
 
   private Schema snapshotSchema() {
-    return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+    if (icebergTable instanceof BaseMetadataTable) {
+      return icebergTable.schema();
+    } else {
+      return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+    }
   }
 
   @Override

Reply via email to