This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 12bcffbebf Spark 3.3: Bug Fix for Metadata table Time Travel with
Schema Evolution (#6980)
12bcffbebf is described below
commit 12bcffbebf98555449435a8aa5408435ffbc4c16
Author: Sung Yun <[email protected]>
AuthorDate: Sun Mar 5 20:00:03 2023 -0500
Spark 3.3: Bug Fix for Metadata table Time Travel with Schema Evolution
(#6980)
---
.../spark/extensions/TestMetadataTables.java | 69 ++++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 7 ++-
2 files changed, 75 insertions(+), 1 deletion(-)
diff --git
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
index a60e0f5d93..2143916384 100644
---
a/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
+++
b/spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -18,6 +18,8 @@
*/
package org.apache.iceberg.spark.extensions;
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
import java.io.IOException;
import java.util.Comparator;
import java.util.List;
@@ -42,10 +44,13 @@ import org.apache.iceberg.spark.Spark3Util;
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.TestHelpers;
import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.catalyst.util.DateTimeUtils;
+import org.apache.spark.sql.types.StructType;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -488,6 +493,70 @@ public class TestMetadataTables extends
SparkExtensionsTestBase {
metadataLogWithProjection);
}
+ @Test
+ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
+ // Create table and insert data
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES"
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+ tableName);
+
+ List<SimpleRecord> recordsA =
+ Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+ spark
+ .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ table.updateSchema().addColumn("category",
Types.StringType.get()).commit();
+
+ List<Row> newRecords =
+ Lists.newArrayList(RowFactory.create(3, "b", "c"),
RowFactory.create(4, "b", "c"));
+
+ StructType newSparkSchema =
+ SparkSchemaUtil.convert(
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get()),
+ optional(3, "category", Types.StringType.get())));
+
+ spark.createDataFrame(newRecords,
newSparkSchema).coalesce(1).writeTo(tableName).append();
+
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+ Dataset<Row> actualFilesDs =
+ spark.sql(
+ "SELECT * FROM "
+ + tableName
+ + ".files VERSION AS OF "
+ + currentSnapshotId
+ + " ORDER BY content");
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+ Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".entries").schema();
+ List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
+ List<Record> expectedFiles =
+ expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
+
+ Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
+
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
+
+ Assert.assertEquals(
+ "expectedFiles and actualFiles size should be the same",
+ actualFiles.size(),
+ expectedFiles.size());
+ }
+
@Test
public void testSnapshotReferencesMetatable() throws Exception {
// Create table and insert data
diff --git
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 004be8b959..c5e367de03 100644
---
a/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -24,6 +24,7 @@ import static
org.apache.iceberg.TableProperties.FORMAT_VERSION;
import java.io.IOException;
import java.util.Map;
import java.util.Set;
+import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.FileScanTask;
@@ -159,7 +160,11 @@ public class SparkTable
}
private Schema snapshotSchema() {
- return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+ if (icebergTable instanceof BaseMetadataTable) {
+ return icebergTable.schema();
+ } else {
+ return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+ }
}
@Override