aokolnychyi commented on a change in pull request #4293:
URL: https://github.com/apache/iceberg/pull/4293#discussion_r823013723



##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from previous snapshot finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(snapshotId))
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();

Review comment:
       nit: is this used?

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from previous snapshot finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(snapshotId))
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(

Review comment:
       nit: one line

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(

Review comment:
       nit: can fit on one line?

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();

Review comment:
       nit: Iceberg does not mark local vars/args as final unless required. I 
know there were some style or even performance (in old JVMs) reasons but I 
think we should be consistent. 

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -319,6 +318,22 @@ public void commit(WriterCommitMessage[] messages) {
         overwriteFiles.addFile(file);
       }
 
+      IsolationLevel isolationLevel = writeConf.isolationLevel();
+      Long validateFromSnapshotId = writeConf.validateFromSnapshotId();
+
+      if (isolationLevel != null && validateFromSnapshotId != null) {
+        overwriteFiles.validateFromSnapshot(validateFromSnapshotId);
+      }
+
+      if (isolationLevel == SERIALIZABLE) {
+        overwriteFiles.validateNoConflictingDeletes();
+        overwriteFiles.validateNoConflictingData();
+
+      } else if (isolationLevel == SNAPSHOT) {
+        overwriteFiles.validateNoConflictingDeletes();
+

Review comment:
       nit: same here

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -277,18 +277,17 @@ public void commit(WriterCommitMessage[] messages) {
       IsolationLevel isolationLevel = writeConf.isolationLevel();
       Long validateFromSnapshotId = writeConf.validateFromSnapshotId();
 
+      if (isolationLevel != null && validateFromSnapshotId != null) {
+        dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
+      }
+
       if (isolationLevel == SERIALIZABLE) {
-        if (validateFromSnapshotId != null) {

Review comment:
       Would it make sense to submit all changes to the existing code in a 
separate PR? I see some modifications in existing tests too.

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from previous snapshot finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(snapshotId))
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from no snapshot id defaults to beginning snapshot id and 
finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterSnapshotIsolation() throws Exception {
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(1, "b"));
+    spark.createDataFrame(records, 
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    // This should add a delete file
+    sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);
+    table.refresh();
+
+    // Validating from previous snapshot finds conflicts
+    records = Lists.newArrayList(new SimpleRecord(1, "a"));

Review comment:
       What about having a separate list called `conflictingRecords` instead of 
reassigning?

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from previous snapshot finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(snapshotId))
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterNoSnapshotIdValidation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"));
+    spark.createDataFrame(records, 
SimpleRecord.class).writeTo(tableName).append();
+
+    // Validating from no snapshot id defaults to beginning snapshot id and 
finds conflicts
+    Dataset<Row> conflictingDf = spark.createDataFrame(records, 
SimpleRecord.class);
+    AssertHelpers.assertThrowsCause("Conflicting new data files should throw 
exception",
+        ValidationException.class,
+        "Found conflicting files that can contain records matching 
ref(name=\"id\") == 1:",
+        () -> {
+          try {
+            conflictingDf.writeTo(tableName)
+                .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+                .overwrite(functions.col("id").equalTo(1));
+          } catch (NoSuchTableException e) {
+            throw new RuntimeException(e);
+          }
+        });
+
+    // Validating from latest snapshot should succeed
+    table.refresh();
+    long newSnapshotId = table.currentSnapshot().snapshotId();
+    conflictingDf.writeTo(tableName)
+        .option(SparkWriteOptions.VALIDATE_FROM_SNAPSHOT_ID, 
String.valueOf(newSnapshotId))
+        .option(SparkWriteOptions.ISOLATION_LEVEL, 
IsolationLevel.SERIALIZABLE.toString())
+        .overwrite(functions.col("id").equalTo(1));
+  }
+
+  @Test
+  public void testOverwriteFilterSnapshotIsolation() throws Exception {
+    List<SimpleRecord> records = Lists.newArrayList(
+        new SimpleRecord(1, "a"),
+        new SimpleRecord(1, "b"));
+    spark.createDataFrame(records, 
SimpleRecord.class).coalesce(1).writeTo(tableName).append();
+
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();
+
+    // This should add a delete file
+    sql("DELETE FROM %s WHERE id='1' and data='b'", tableName);

Review comment:
       Can we add a test with delete files for serializable isolation too?

##########
File path: 
spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestConflictValidation.java
##########
@@ -56,6 +57,130 @@ public void removeTables() {
     sql("DROP TABLE IF EXISTS %s", tableName);
   }
 
+  @Test
+  public void testOverwriteFilterSerializableIsolation() throws Exception {
+    Table table = validationCatalog.loadTable(tableIdent);
+    final long snapshotId = table.currentSnapshot().snapshotId();

Review comment:
       There are a few places like this in this file.

##########
File path: 
spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -277,18 +277,17 @@ public void commit(WriterCommitMessage[] messages) {
       IsolationLevel isolationLevel = writeConf.isolationLevel();
       Long validateFromSnapshotId = writeConf.validateFromSnapshotId();
 
+      if (isolationLevel != null && validateFromSnapshotId != null) {
+        dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
+      }
+
       if (isolationLevel == SERIALIZABLE) {
-        if (validateFromSnapshotId != null) {
-          dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
-        }
         dynamicOverwrite.validateNoConflictingData();
         dynamicOverwrite.validateNoConflictingDeletes();
 
       } else if (isolationLevel == SNAPSHOT) {
-        if (validateFromSnapshotId != null) {
-          dynamicOverwrite.validateFromSnapshot(validateFromSnapshotId);
-        }
         dynamicOverwrite.validateNoConflictingDeletes();
+

Review comment:
       nit: is this necessary? We do sometimes add empty lines at the end of if 
clauses but not in the last one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to