This is an automated email from the ASF dual-hosted git repository.

szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new dd1f4ab006 Spark 3.1: Bug Fix for Metadata table Time Travel with 
Schema Evolution (#6993)
dd1f4ab006 is described below

commit dd1f4ab0067e228c9aeb253599467149cbf82a81
Author: Sung Yun <[email protected]>
AuthorDate: Sun Mar 5 20:02:26 2023 -0500

    Spark 3.1: Bug Fix for Metadata table Time Travel with Schema Evolution 
(#6993)
---
 .../spark/extensions/TestMetadataTables.java       | 166 +++++++++++++++++++++
 .../apache/iceberg/spark/source/SparkTable.java    |   7 +-
 .../org/apache/iceberg/spark/data/TestHelpers.java |  33 +++-
 3 files changed, 203 insertions(+), 3 deletions(-)

diff --git 
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
new file mode 100644
index 0000000000..2cdefa731a
--- /dev/null
+++ 
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMetadataTables extends SparkExtensionsTestBase {
+
+  public TestMetadataTables(String catalogName, String implementation, 
Map<String, String> config) {
+    super(catalogName, implementation, config);
+  }
+
+  @After
+  public void removeTables() {
+    sql("DROP TABLE IF EXISTS %s", tableName);
+  }
+
+  @Test
+  public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
+    // Create table and insert data
+    sql(
+        "CREATE TABLE %s (id bigint, data string) "
+            + "USING iceberg "
+            + "PARTITIONED BY (data) "
+            + "TBLPROPERTIES"
+            + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+        tableName);
+
+    List<SimpleRecord> recordsA =
+        Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+    spark
+        .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+        .coalesce(1)
+        .writeTo(tableName)
+        .append();
+
+    Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+    table.updateSchema().addColumn("category", 
Types.StringType.get()).commit();
+
+    List<Row> newRecords =
+        Lists.newArrayList(RowFactory.create(3, "b", "c"), 
RowFactory.create(4, "b", "c"));
+
+    StructType newSparkSchema =
+        SparkSchemaUtil.convert(
+            new Schema(
+                optional(1, "id", Types.IntegerType.get()),
+                optional(2, "data", Types.StringType.get()),
+                optional(3, "category", Types.StringType.get())));
+
+    spark.createDataFrame(newRecords, 
newSparkSchema).coalesce(1).writeTo(tableName).append();
+
+    Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+    Dataset<Row> actualFilesDs =
+        spark
+            .read()
+            .format("iceberg")
+            .option("snapshot-id", currentSnapshotId)
+            .load(tableName + ".files")
+            .orderBy("content");
+
+    List<Row> actualFiles = 
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+    Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName + 
".entries").schema();
+    List<ManifestFile> expectedDataManifests = 
TestHelpers.dataManifests(table);
+    List<Record> expectedFiles =
+        expectedEntries(table, FileContent.DATA, entriesTableSchema, 
expectedDataManifests, null);
+
+    Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0), 
actualFiles.get(0));
+
+    TestHelpers.assertEqualsSafe(
+        TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1), 
actualFiles.get(1));
+
+    Assert.assertEquals(
+        "expectedFiles and actualFiles size should be the same",
+        actualFiles.size(),
+        expectedFiles.size());
+  }
+
+  /**
+   * Find matching manifest entries of an Iceberg table
+   *
+   * @param table iceberg table
+   * @param expectedContent file content to populate on entries
+   * @param entriesTableSchema schema of Manifest entries
+   * @param manifestsToExplore manifests to explore of the table
+   * @param partValue partition value that manifest entries must match, or 
null to skip filtering
+   */
+  private List<Record> expectedEntries(
+      Table table,
+      FileContent expectedContent,
+      Schema entriesTableSchema,
+      List<ManifestFile> manifestsToExplore,
+      String partValue)
+      throws IOException {
+    List<Record> expected = Lists.newArrayList();
+    for (ManifestFile manifest : manifestsToExplore) {
+      InputFile in = table.io().newInputFile(manifest.path());
+      try (CloseableIterable<Record> rows = 
Avro.read(in).project(entriesTableSchema).build()) {
+        for (Record record : rows) {
+          if ((Integer) record.get("status") < 2 /* added or existing */) {
+            Record file = (Record) record.get("data_file");
+            if (partitionMatch(file, partValue)) {
+              TestHelpers.asMetadataRecord(file, expectedContent);
+              expected.add(file);
+            }
+          }
+        }
+      }
+    }
+    return expected;
+  }
+
+  private boolean partitionMatch(Record file, String partValue) {
+    if (partValue == null) {
+      return true;
+    }
+    Record partition = (Record) file.get(4);
+    return partValue.equals(partition.get(0).toString());
+  }
+}
diff --git 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 3f7be9d006..eebbce9413 100644
--- 
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++ 
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -27,6 +27,7 @@ import static 
org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
 
 import java.util.Map;
 import java.util.Set;
+import org.apache.iceberg.BaseMetadataTable;
 import org.apache.iceberg.MetadataColumns;
 import org.apache.iceberg.Partitioning;
 import org.apache.iceberg.RowLevelOperationMode;
@@ -124,7 +125,11 @@ public class SparkTable
   }
 
   private Schema snapshotSchema() {
-    return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+    if (icebergTable instanceof BaseMetadataTable) {
+      return icebergTable.schema();
+    } else {
+      return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+    }
   }
 
   @Override
diff --git 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 42f4c1a1ab..65539edbe6 100644
--- 
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++ 
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -36,15 +36,22 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
+import java.util.stream.Stream;
 import org.apache.arrow.vector.ValueVector;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
 import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
 import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -109,7 +116,7 @@ public class TestHelpers {
     }
   }
 
-  private static void assertEqualsSafe(Types.ListType list, Collection<?> 
expected, List actual) {
+  public static void assertEqualsSafe(Types.ListType list, Collection<?> 
expected, List actual) {
     Type elementType = list.elementType();
     List<?> expectedElements = Lists.newArrayList(expected);
     for (int i = 0; i < expectedElements.size(); i += 1) {
@@ -144,7 +151,7 @@ public class TestHelpers {
   private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
 
   @SuppressWarnings("unchecked")
-  private static void assertEqualsSafe(Type type, Object expected, Object 
actual) {
+  public static void assertEqualsSafe(Type type, Object expected, Object 
actual) {
     if (expected == null && actual == null) {
       return;
     }
@@ -767,4 +774,26 @@ public class TestHelpers {
           actualValues.isNullAt(i) ? null : actualValues.get(i, valueType));
     }
   }
+
+  public static void asMetadataRecord(GenericData.Record file, FileContent 
content) {
+    file.put(0, content.id());
+    file.put(3, 0); // specId
+  }
+
+  public static List<ManifestFile> dataManifests(Table table) {
+    return table.currentSnapshot().dataManifests(table.io());
+  }
+
+  public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+    StructField[] fields = metadataTable.schema().fields();
+    return metadataTable.select(
+        Stream.of(fields)
+            .filter(f -> !f.name().equals("readable_metrics")) // derived field
+            .map(f -> new Column(f.name()))
+            .toArray(Column[]::new));
+  }
+
+  public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+    return 
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+  }
 }

Reply via email to