szehon-ho commented on code in PR #6980:
URL: https://github.com/apache/iceberg/pull/6980#discussion_r1122466917
##########
spark/v3.3/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java:
##########
@@ -165,7 +166,11 @@ private Schema snapshotSchema() {
@Override
public StructType schema() {
if (lazyTableSchema == null) {
- this.lazyTableSchema = SparkSchemaUtil.convert(snapshotSchema());
+ if (icebergTable instanceof BaseMetadataTable) {
+ this.lazyTableSchema = SparkSchemaUtil.convert(icebergTable.schema());
Review Comment:
So this seems to disable time-travel using the right schema for Metadata
Tables. I think we should fix in long run, but maybe this is ok for a quick
fix for now.
How about, we can put this logic in SnapshotUtil.schemaFor(). If table is
BaseMetadataTable, return current the schema. That way, I think we can change
just one place in the code? And fix it there later when we want to implement
this?
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java:
##########
@@ -488,6 +489,54 @@ public void testMetadataLogEntries() throws Exception {
metadataLogWithProjection);
}
+ @Test
+ public void testFilesVersionAsOf() throws Exception {
+ // Create table and insert data
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES"
+ + "('format-version'='2',
'write.delete.mode'='merge-on-read')",
+ tableName);
+
+ List<SimpleRecord> recordsA =
+ Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2,
"a"));
+ spark
+ .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Long olderSnapshotId = table.currentSnapshot().snapshotId();
+
+ sql(
+ "ALTER TABLE %s ADD COLUMNS (data2 string)",
+ tableName);
+
+ List<SimpleExtraColumnRecord> recordsB =
+ Lists.newArrayList(new SimpleExtraColumnRecord(1, "b", "c"), new
SimpleExtraColumnRecord (2, "b", "c"));
+ spark
+ .createDataset(recordsB,
Encoders.bean(SimpleExtraColumnRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+
+ List<Object[]> res1 = sql("SELECT * from %s.files VERSION AS OF %s",
tableName, olderSnapshotId );
+
+ Dataset<Row> ds = spark.read().format("iceberg").option("snapshot-id",
olderSnapshotId ).load(tableName + ".files");
+ List<Row> res2 = ds.collectAsList();
+
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+ List<Object[]> res3 = sql("SELECT * from %s.files VERSION AS OF %s",
tableName, currentSnapshotId);
+
+ Dataset<Row> ds2 = spark.read().format("iceberg").option("snapshot-id",
currentSnapshotId ).load(tableName + ".files");
Review Comment:
We will actually need to do an assert here on the value, for a meaningful
test, see the other examples.
You could look at other examples (testAllFilesPartitioned, which is more
complete, but complex):
```
Dataset<Row> actualDataFilesDs =
spark.sql("SELECT * FROM " + tableName + ".files VERSION AS OF %s",
tableName, currentSnasphotId);
List<ManifestFile> expectedDataManifests = TestHelpers.dataManifests(table);
List<Record> expectedDataFiles =
expectedEntries(table, FileContent.DATA, entriesTableSchema,
expectedDataManifests, null);
TestHelpers.assertEqualsSafe(
SparkSchemaUtil.convert(TestHelpers.selectNonDerived(actualDataFilesDs).schema())
.asStruct(),
expectedDataFiles.get(0),
TestHelpers.selectNonDerived(actualDataFilesDs).collectAsList());
```
Or maybe easier, sort the data and check fields individually , like
testSnapshotReferencesMetatable
##########
spark/v3.3/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMetadataTables.java:
##########
@@ -488,6 +489,54 @@ public void testMetadataLogEntries() throws Exception {
metadataLogWithProjection);
}
+ @Test
+ public void testFilesVersionAsOf() throws Exception {
+ // Create table and insert data
+ sql(
+ "CREATE TABLE %s (id bigint, data string) "
+ + "USING iceberg "
+ + "PARTITIONED BY (data) "
+ + "TBLPROPERTIES"
+ + "('format-version'='2',
'write.delete.mode'='merge-on-read')",
+ tableName);
+
+ List<SimpleRecord> recordsA =
+ Lists.newArrayList(new SimpleRecord(1, "a"), new SimpleRecord(2,
"a"));
+ spark
+ .createDataset(recordsA, Encoders.bean(SimpleRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+ Table table = Spark3Util.loadIcebergTable(spark, tableName);
+ Long olderSnapshotId = table.currentSnapshot().snapshotId();
+
+ sql(
+ "ALTER TABLE %s ADD COLUMNS (data2 string)",
+ tableName);
+
+ List<SimpleExtraColumnRecord> recordsB =
+ Lists.newArrayList(new SimpleExtraColumnRecord(1, "b", "c"), new
SimpleExtraColumnRecord (2, "b", "c"));
+ spark
+ .createDataset(recordsB,
Encoders.bean(SimpleExtraColumnRecord.class))
+ .coalesce(1)
+ .writeTo(tableName)
+ .append();
+
+
+ List<Object[]> res1 = sql("SELECT * from %s.files VERSION AS OF %s",
tableName, olderSnapshotId );
+
+ Dataset<Row> ds = spark.read().format("iceberg").option("snapshot-id",
olderSnapshotId ).load(tableName + ".files");
+ List<Row> res2 = ds.collectAsList();
+
+ Long currentSnapshotId = table.currentSnapshot().snapshotId();
+
+ List<Object[]> res3 = sql("SELECT * from %s.files VERSION AS OF %s",
tableName, currentSnapshotId);
Review Comment:
Let's remove queries that don't matter here
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]