nastra opened a new issue, #7851:
URL: https://github.com/apache/iceberg/issues/7851

   ### Apache Iceberg version
   
   1.3.0 (latest release)
   
   ### Query engine
   
   Spark
   
   ### Please describe the bug 🐞
   
   
   # Using WAP Branch + DELETE with a WHERE :heavy_check_mark: 
   This works as expected as can be seen below. We only delete from the 
`audit_branch` and see that `main` still has `1000` records when performing 
`DELETE FROM nyc.permits WHERE borough='Manhattan'`
   
   ```
   scala> spark.sql("select count(*) from nyc.permits").show();
   +--------+
   |count(1)|
   +--------+
   |    1000|
   +--------+
   
   
   scala> spark.sql("SELECT borough, count(*) permit_cnt FROM nyc.permits GROUP 
BY borough").show()
   +-------------+----------+                                                   
   
   |      borough|permit_cnt|
   +-------------+----------+
   |       Queens|       168|
   |        Bronx|        28|
   |    Manhattan|       463|
   |     Brooklyn|       334|
   |Staten Island|         7|
   +-------------+----------+
   
   
   scala> spark.sql("alter table nyc.permits SET TBLPROPERTIES 
('write.wap.enabled'='true')")
   23/06/16 12:22:22 WARN BaseTransaction: Failed to load metadata for a 
committed snapshot, skipping clean-up
   res6: org.apache.spark.sql.DataFrame = []
   
   
   scala> spark.sql("alter table nyc.permits create branch audit_branch")
   ANTLR Tool version 4.8 used for code generation does not match the current 
runtime version 4.9.3
   ANTLR Tool version 4.8 used for code generation does not match the current 
runtime version 4.9.3
   23/06/16 12:23:10 WARN BaseTransaction: Failed to load metadata for a 
committed snapshot, skipping clean-up
   res8: org.apache.spark.sql.DataFrame = []
   
   scala> spark.conf.set("spark.wap.branch", "audit_branch")
   
   scala> spark.conf.get("spark.wap.branch")
   res10: String = audit_branch
   
   scala> spark.sql("DELETE FROM nyc.permits WHERE borough='Manhattan'")
   res11: org.apache.spark.sql.DataFrame = []                                   
   
   
   scala> spark.sql("select * from nyc.permits.refs").show();
   
+--------------+------+-------------------+-----------------------+---------------------+----------------------+
   |          name|  type|        
snapshot_id|max_reference_age_in_ms|min_snapshots_to_keep|max_snapshot_age_in_ms|
   
+--------------+------+-------------------+-----------------------+---------------------+----------------------+
   |  audit_branch|BRANCH| 993079427284131780|                   null|          
       null|                  null|
   |          main|BRANCH|3715865987057790507|                   null|          
       null|                  null|
   |    etl_job_42|BRANCH|4058805292508318372|                   null|          
       null|                  null|
   |audit_branch_2|BRANCH|3715865987057790507|                   null|          
       null|                  null|
   
+--------------+------+-------------------+-----------------------+---------------------+----------------------+
   
   
   scala> spark.sql("select count(*) from nyc.permits").show();
   +--------+
   |count(1)|
   +--------+
   |     537|
   +--------+
   
   
   scala> spark.sql("select count(*) from nyc.permits version as of 
'main'").show();
   +--------+
   |count(1)|
   +--------+
   |    1000|
   +--------+
   ```
   
   
   
   # Using WAP Branch + DELETE without a WHERE :bug: 
   Performing a `DELETE FROM nyc.permits` results in losing data from `main` 
even though the WAP branch was set to `audit_branch`
   
   ```
   scala> spark.sql("select count(*) from nyc.permits").show();
   +--------+
   |count(1)|
   +--------+
   |    1000|
   +--------+
   
   
   scala> spark.sql("SELECT borough, count(*) permit_cnt FROM nyc.permits GROUP 
BY borough").show()
   +-------------+----------+                                                   
   
   |      borough|permit_cnt|
   +-------------+----------+
   |       Queens|       168|
   |        Bronx|        28|
   |    Manhattan|       463|
   |     Brooklyn|       334|
   |Staten Island|         7|
   +-------------+----------+
   
   
   scala> spark.sql("alter table nyc.permits SET TBLPROPERTIES 
('write.wap.enabled'='true')")
   23/06/16 12:22:22 WARN BaseTransaction: Failed to load metadata for a 
committed snapshot, skipping clean-up
   res6: org.apache.spark.sql.DataFrame = []
   
   
   scala> spark.sql("alter table nyc.permits create branch audit_branch")
   ANTLR Tool version 4.8 used for code generation does not match the current 
runtime version 4.9.3
   ANTLR Tool version 4.8 used for code generation does not match the current 
runtime version 4.9.3
   23/06/16 12:23:10 WARN BaseTransaction: Failed to load metadata for a 
committed snapshot, skipping clean-up
   res8: org.apache.spark.sql.DataFrame = []
   
   scala> spark.conf.set("spark.wap.branch", "audit_branch")
   
   scala> spark.conf.get("spark.wap.branch")
   res10: String = audit_branch
   
   scala> spark.sql("DELETE FROM nyc.permits")
   res11: org.apache.spark.sql.DataFrame = []                                   
   
   
   
   scala> spark.sql("select count(*) from nyc.permits").show();
   +--------+
   |count(1)|
   +--------+
   |    1000|
   +--------+
   
   
   scala> spark.sql("select count(*) from nyc.permits version as of 
'main'").show();
   +--------+
   |count(1)|
   +--------+
   |       0|
   +--------+
   ```
   
   # Analysis
   
   In the first example we end up reading the WAP branch from `SparkWriteConf`, 
since we end up in a subclass of `SparkWrite`. 
   
   In the second example, we end up in 
[SparkTable#deleteWhere(..)](https://github.com/apache/iceberg/blob/893af4a19841ae23e18b1e2196df9176d9d90bc2/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkTable.java#L361-L384),
 where we don't have the branch from the `SparkWriteConf`. The `branch` 
variable in that method is the one that is parsed from the table name. 
   
   # How to reproduce
   
   Adding the below test to `TestDelete` reproduces the issue:
   
   ```
   @Test
     public void testDeleteToCustomWapBranchWithoutWhereClause() throws 
NoSuchTableException {
       Assume.assumeTrue(
           "Run only if custom WAP branch is not main",
           branch != null && !branch.equals(SnapshotRef.MAIN_BRANCH));
   
       createAndInitPartitionedTable();
       sql(
           "ALTER TABLE %s SET TBLPROPERTIES ('%s' = 'true', 'format-version' = 
'2')",
           tableName, TableProperties.WRITE_AUDIT_PUBLISH_ENABLED);
       append(tableName, new Employee(0, "hr"), new Employee(1, "hr"), new 
Employee(2, "hr"));
       createBranchIfNeeded();
   
       withSQLConf(
           ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
           () -> {
             sql("DELETE FROM %s t WHERE id=1", tableName);
             
Assertions.assertThat(spark.table(tableName).count()).isEqualTo(2L);
           });
   
       // delete without WHERE clause should not delete from main
       withSQLConf(
           ImmutableMap.of(SparkSQLProperties.WAP_BRANCH, branch),
           () -> {
             sql("DELETE FROM %s t", tableName);
             
Assertions.assertThat(spark.table(tableName).count()).isEqualTo(0L);
           });
     }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to