szehon-ho commented on a change in pull request #4293:
URL: https://github.com/apache/iceberg/pull/4293#discussion_r823262066
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testOverwriteFilterSerializableIsolation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
Review comment:
Done, was keeping consistent with the existing test. Will change those
in another pr.
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testOverwriteFilterSerializableIsolation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records,
SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting new data files should throw
exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching
ref(name=\"id\") == 1:",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ }
+
+ @Test
+ public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
Review comment:
Good catch, removed
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testOverwriteFilterSerializableIsolation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records,
SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting new data files should throw
exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching
ref(name=\"id\") == 1:",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ }
+
+ @Test
+ public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
Review comment:
Done
##########
File path:
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
sql("DROP TABLE IF EXISTS %s", tableName);
}
+ @Test
+ public void testOverwriteFilterSerializableIsolation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records,
SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from previous snapshot finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting new data files should throw
exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching
ref(name=\"id\") == 1:",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(snapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ }
+
+ @Test
+ public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"));
+ spark.createDataFrame(records,
SimpleRecord.class).writeTo(tableName).append();
+
+ // Validating from no snapshot id defaults to beginning snapshot id and
finds conflicts
+ Dataset<Row> conflictingDf = spark.createDataFrame(records,
SimpleRecord.class);
+ AssertHelpers.assertThrowsCause("Conflicting new data files should throw
exception",
+ ValidationException.class,
+ "Found conflicting files that can contain records matching
ref(name=\"id\") == 1:",
+ () -> {
+ try {
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ } catch (NoSuchTableException e) {
+ throw new RuntimeException(e);
+ }
+ });
+
+ // Validating from latest snapshot should succeed
+ table.refresh();
+ long newSnapshotId = table.currentSnapshot().snapshotId();
+ conflictingDf.writeTo(tableName)
+ .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID,
String.valueOf(newSnapshotId))
+ .option(SparkWriteOptions.ISOLATION_LEVEL,
IsolationLevel.SERIALIZABLE.toString())
+ .overwrite(functions.col("id").equalTo(1));
+ }
+
+ @Test
+ public void testOverwriteFilterSnapshotIsolation() throws Exception {
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(1, "b"));
+ spark.createDataFrame(records,
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+ Table table = validationCatalog.loadTable(tableIdent);
+ final long snapshotId = table.currentSnapshot().snapshotId();
+
+ // This should add a delete file
+ sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);
Review comment:
Added
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]