This is an automated email from the ASF dual-hosted git repository.
szehon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new dd1f4ab006 Spark 3.1: Bug Fix for Metadata table Time Travel with
Schema Evolution (#6993)
dd1f4ab006 is described below
commit dd1f4ab0067e228c9aeb253599467149cbf82a81
Author: Sung Yun <[email protected]>
AuthorDate: Sun Mar 5 20:02:26 2023 -0500
Spark 3.1: Bug Fix for Metadata table Time Travel with Schema Evolution
(#6993)
---
.../spark/extensions/TestMetadataTables.java | 166 +++++++++++++++++++++
.../apache/iceberg/spark/source/SparkTable.java | 7 +-
.../org/apache/iceberg/spark/data/TestHelpers.java | 33 +++-
3 files changed, 203 insertions(+), 3 deletions(-)
diff --git
a/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
new file mode 100644
index 0000000000..2cdefa731a
--- /dev/null
+++
b/spark/v3.1/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.spark.extensions;
+
+import static org.apache.iceberg.types.Types.NestedField.optional;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.avro.Avro;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.Spark3Util;
+import org.apache.iceberg.spark.SparkSchemaUtil;
+import org.apache.iceberg.spark.data.TestHelpers;
+import org.apache.iceberg.spark.source.SimpleRecord;
+import org.apache.iceberg.types.Types;
+import org.apache.spark.sql.Dataset;
+import org.apache.spark.sql.Encoders;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.RowFactory;
+import org.apache.spark.sql.types.StructType;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestMetadataTables extends SparkExtensionsTestBase {
+
+ public TestMetadataTables(String catalogName, String implementation,
Map<String, String> config) {
+ super(catalogName, implementation, config);
+ }
+
+ @After
+ public void removeTables() {
+ sql("DROP TABLE IF EXISTS %s", tableName);
+ }
+
+ @Test
+ public void testFilesTableTimeTravelWithSchemaEvolution() throws Exception {
+ // Create table and insert data
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES"
+ + "('format-version'='2', 'write.delete.mode'='merge-on-read')",
+ tableName);
+
+ List<SimpleRecord> recordsA =
+ Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2, "a"));
+ spark
+ .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+
+ table.updateSchema().addColumn("category",
Types.StringType.get()).commit();
+
+ List<Row> newRecords =
+ Lists.newArrayList(RowFactory.create(3, "b", "c"),
RowFactory.create(4, "b", "c"));
+
+ StructType newSparkSchema =
+ SparkSchemaUtil.convert(
+ new Schema(
+ optional(1, "id", Types.IntegerType.get()),
+ optional(2, "data", Types.StringType.get()),
+ optional(3, "category", Types.StringType.get())));
+
+ spark.createDataFrame(newRecords,
newSparkSchema).coalesce(1).writeTo(tableName).append();
+
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+ Dataset<Row> actualFilesDs =
+ spark
+ .read()
+ .format("iceberg")
+ .option("snapshot-id", currentSnapshotId)
+ .load(tableName + ".files")
+ .orderBy("content");
+
+ List<Row> actualFiles =
TestHelpers.selectNonDerived(actualFilesDs).collectAsList();
+ Schema entriesTableSchema = Spark3Util.loadIcebergTable(spark, tableName +
".entries").schema();
+ List<ManifestFile> expectedDataManifests =
TestHelpers.dataManifests(table);
+ List<Record> expectedFiles =
+ expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
+
+ Assert.assertEquals("actualFiles size should be 2", 2, actualFiles.size());
+
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(0),
actualFiles.get(0));
+
+ TestHelpers.assertEqualsSafe(
+ TestHelpers.nonDerivedSchema(actualFilesDs), expectedFiles.get(1),
actualFiles.get(1));
+
+ Assert.assertEquals(
+ "expectedFiles and actualFiles size should be the same",
+ actualFiles.size(),
+ expectedFiles.size());
+ }
+
+ /**
+ * Find matching manifest entries of an Iceberg table
+ *
+ * @param table iceberg table
+ * @param expectedContent file content to populate on entries
+ * @param entriesTableSchema schema of Manifest entries
+ * @param manifestsToExplore manifests to explore of the table
+ * @param partValue partition value that manifest entries must match, or
null to skip filtering
+ */
+ private List<Record> expectedEntries(
+ Table table,
+ FileContent expectedContent,
+ Schema entriesTableSchema,
+ List<ManifestFile> manifestsToExplore,
+ String partValue)
+ throws IOException {
+ List<Record> expected = Lists.newArrayList();
+ for (ManifestFile manifest : manifestsToExplore) {
+ InputFile in = table.io().newInputFile(manifest.path());
+ try (CloseableIterable<Record> rows =
Avro.read(in).project(entriesTableSchema).build()) {
+ for (Record record : rows) {
+ if ((Integer) record.get("status") < 2 /* added or existing */) {
+ Record file = (Record) record.get("data_file");
+ if (partitionMatch(file, partValue)) {
+ TestHelpers.asMetadataRecord(file, expectedContent);
+ expected.add(file);
+ }
+ }
+ }
+ }
+ }
+ return expected;
+ }
+
+ private boolean partitionMatch(Record file, String partValue) {
+ if (partValue == null) {
+ return true;
+ }
+ Record partition = (Record) file.get(4);
+ return partValue.equals(partition.get(0).toString());
+ }
+}
diff --git
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
index 3f7be9d006..eebbce9413 100644
---
a/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
+++
b/spark/v3.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java
@@ -27,6 +27,7 @@ import static
org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT;
import java.util.Map;
import java.util.Set;
+import org.apache.iceberg.BaseMetadataTable;
import org.apache.iceberg.MetadataColumns;
import org.apache.iceberg.Partitioning;
import org.apache.iceberg.RowLevelOperationMode;
@@ -124,7 +125,11 @@ public class SparkTable
}
private Schema snapshotSchema() {
- return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+ if (icebergTable instanceof BaseMetadataTable) {
+ return icebergTable.schema();
+ } else {
+ return SnapshotUtil.schemaFor(icebergTable, snapshotId, null);
+ }
}
@Override
diff --git
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
index 42f4c1a1ab..65539edbe6 100644
---
a/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
+++
b/spark/v3.1/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java
@@ -36,15 +36,22 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.util.stream.Stream;
import org.apache.arrow.vector.ValueVector;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
+import org.apache.iceberg.FileContent;
+import org.apache.iceberg.ManifestFile;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.data.vectorized.IcebergArrowColumnVector;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.orc.storage.serde2.io.DateWritable;
+import org.apache.spark.sql.Column;
+import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericRow;
@@ -109,7 +116,7 @@ public class TestHelpers {
}
}
- private static void assertEqualsSafe(Types.ListType list, Collection<?>
expected, List actual) {
+ public static void assertEqualsSafe(Types.ListType list, Collection<?>
expected, List actual) {
Type elementType = list.elementType();
List<?> expectedElements = Lists.newArrayList(expected);
for (int i = 0; i < expectedElements.size(); i += 1) {
@@ -144,7 +151,7 @@ public class TestHelpers {
private static final LocalDate EPOCH_DAY = EPOCH.toLocalDate();
@SuppressWarnings("unchecked")
- private static void assertEqualsSafe(Type type, Object expected, Object
actual) {
+ public static void assertEqualsSafe(Type type, Object expected, Object
actual) {
if (expected == null && actual == null) {
return;
}
@@ -767,4 +774,26 @@ public class TestHelpers {
actualValues.isNullAt(i) ? null : actualValues.get(i, valueType));
}
}
+
+ public static void asMetadataRecord(GenericData.Record file, FileContent
content) {
+ file.put(0, content.id());
+ file.put(3, 0); // specId
+ }
+
+ public static List<ManifestFile> dataManifests(Table table) {
+ return table.currentSnapshot().dataManifests(table.io());
+ }
+
+ public static Dataset<Row> selectNonDerived(Dataset<Row> metadataTable) {
+ StructField[] fields = metadataTable.schema().fields();
+ return metadataTable.select(
+ Stream.of(fields)
+ .filter(f -> !f.name().equals("readable_metrics")) // derived field
+ .map(f -> new Column(f.name()))
+ .toArray(Column[]::new));
+ }
+
+ public static Types.StructType nonDerivedSchema(Dataset<Row> metadataTable) {
+ return
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(metadataTable).schema()).asStruct();
+ }
}