szehon-ho commented on a change in pull request #4293:
URL: https://github.com/apache/iceberg/pull/4293#discussion_r823263349



##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from previous snapshot finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(snapshotId))
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from no snapshot id defaults to beginning snapshot id and 
finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterSnapshotIsolation() throws Exception {
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(1, "b"));
+    spark.createDataFrame(records, 
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    // This should add a delete file
+    sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);
+    table.refresh();
+
+    // Validating from previous snapshot finds conflicts
+    records = Lists.newArrayList(new SimpleRecord(1, "a"));

Review comment:
       Done




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to