SaurabhChawla100 opened a new issue #3559:
URL: https://github.com/apache/iceberg/issues/3559


   In SparkMergeScan (for merge and update sql) queries snapshotId is assigned 
using the below code 
   
   ``` 
     Snapshot currentSnapshot = table.currentSnapshot();
     this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : 
null;
   ```
   This will work fine if there is single spark job is running and its updating 
and fetching the records from the table. But In case multiple spark jobs is 
running (merge / update on the same table).
   
   Scenario -> Multiple Merge command in two Continuous Running spark jobs that 
are running at the merge command at different point of time
   
   Both Spark jobs is working on same table -> testIceberg With 
CurrentSnapshotId= P
   
   Step 1  Continuous Running Spark job 1 -> Merge command is executed in the 
first spark job , and update the table(testIceberg) -> Now the 
CurrentSnapshotId=X
   
   Step 2 Continuous Running Spark job 2 -> After the first job  (merge/update) 
is done, There is a trigger that has added and job 2 updates the same table 
based on some condition. Since its running for the first time , it loads the 
table(testIceberg) with the updated CurrentSnapshotId=X. And after updating the 
snapshot becomes CurrentSnapshotId=Y. And send the trigger for Spark job1 to 
run final step on same table(testIceberg).
   
   Step3  Continuous Running Spark job 1 -> In this step when the Spark job 
having the snapshot of the table(testIceberg) CurrentSnapshotId=X at the time 
of SparkMergeScan is called and tried to update the table(testIceberg) which is 
the on the old snapshot of data. And after all the compute of task. When it 
tried to check the snapshot on which the processing is done is same as the 
current snapshot in MergingSnapshotProducer. This throws the error 
   `Found conflicting files that can contain records matching`
   
   To Handle this scenario I am refresh the table before doing any update and 
merge action on the table using the below code in the spark job
    
   ```
   val conf = spark.sessionState.newHadoopConf()
   val  catalog = new org.apache.iceberg.hive.HiveCatalog(conf)
   val tableIdf = TableIdentifier.of("default", "testIceberg")
   val table = catalog.loadTable(y)
   table.refresh
   ```
   
   On using the above code the snapshot is updated with latest value and I am 
able to run merge / update on both spark job.
   
   Issue related to Current code
   1) In the current code after all the compute is done and the date is saved 
in the data location , Then we are checking for conflict in the snapshotID and 
if there is conflict, it deletes all the data which is saved. If we would have 
failed job here its self in the SparkMergeScan. The compute would have saved.
   
   2) If we can use the time table.refresh in the SparkMergeScan we can get 
latest currentSnapshot of the table.
   
   ``` 
    table.refresh();
     Snapshot currentSnapshot = table.currentSnapshot();
     this.snapshotId = currentSnapshot != null ? currentSnapshot.snapshotId() : 
null;
   ```
   on adding this table to refresh , I am able to run the scenario which i 
discussed without failure.
   
   Steps to reproduce for running the two spark jobs
   
   1) Create table 
   ```
   spark.sql("""CREATE EXTERNAL TABLE `testIceberg`(
           `organizationid` string, 
           `userid` string,  
           `trxdate` string)
           USING iceberg
         PARTITIONED BY (trxdate)""")
   ``` 
   2) Insert some records 
   ```
   spark.sql("insert into testIceberg values ('org1', 'userid1', '20211108')")
   spark.sql("insert into testIceberg values ('org1', 'userid2', '20211108')")
   spark.sql("insert into testIceberg values ('org2', 'userid2', '20211109')")
   ```
   
   3) create some source data and merge on Spark Job1
   ```
   case class Test(organizationid: String, userId: String, trxdate:String)
   
   Seq(Test("org21", "userid21", "20211115"), Test("org221", "userid221", 
"20211119")).toDF.createOrReplaceTempView("test12")
   
   spark.sql("MERGE into testIceberg1 t USING (SELECT * from test12) s on 
t.userid = s.userid and s.trxdate='202111144' when matched then update set 
t.trxdate='20211111' WHEN NOT MATCHED THEN insert (organizationid, userid, 
trxdate) values (s.organizationid, s.userid , '20211113')")
   
   ```
   3) create some source data and merge on Spark Job2
   ```
   case class Test(organizationid: String, userId: String, trxdate:String)
   
   Seq(Test("org21", "userid21", "20211115"), Test("org221", "userid221", 
"20211119")).toDF.createOrReplaceTempView("test12")
   
   spark.sql("MERGE into testIceberg2 t USING (SELECT * from test12) s on 
t.userid = s.userid and t.trxdate='20211119' when matched then update set 
t.trxdate='20211111' WHEN NOT MATCHED THEN insert (organizationid, userid, 
trxdate) values (s.organizationid, s.userid , '20211120')")
   ```
   
   4) Now run the merge command in the Spark Job1
   ```
   spark.sql("MERGE into testIceberg1 t USING (SELECT * from test12) s on 
t.userid = s.userid and s.trxdate='202111144' when matched then update set 
t.trxdate='20211111' WHEN NOT MATCHED THEN insert (organizationid, userid, 
trxdate) values (s.organizationid, s.userid , '20211113')")
   ```
   
   5) Now spark job1 will fail with below exception
   ```
   Caused by: org.apache.iceberg.exceptions.ValidationException: Found 
conflicting files that can contain records matching true: 
[/user/hive/warehouse/testIceberg2/data/trxdate=20211120/00057-611-5a6c447a-ff21-4e76-9fda-327f613ca2b4-00001.parquet,
 
/user/hive/warehouse/testIceberg2/data/trxdate=20211120/00192-612-4665e74d-2fb1-4faa-b2f4-9097eaa405f0-00001.parquet]
     at 
org.apache.iceberg.MergingSnapshotProducer.validateAddedDataFiles(MergingSnapshotProducer.java:277)
   ```
   
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to