abmo-x opened a new pull request, #5311:
URL: https://github.com/apache/iceberg/pull/5311

   Related to https://github.com/apache/iceberg/issues/5310 and 
https://github.com/apache/iceberg/issues/4168
   
   **Issue**
   
   When S3OutputStream fails to upload a file successfully on call to **close** 
due to some failure, IcebergStreamWriter in Flink still ends up adding the file 
to completedDataFiles from **BaseTaskWriter** resulting in table metadata 
pointing to a s3 data file which was never uploaded to s3. 
   
   **Steps to Reproduce**
   
   -  Flink 1.14 pipeline with Iceberg 0.13
   -  Customer implemented **ProcessFunction<FlinkRecord, Row>** function which 
ignores all exceptions in **processElement**
         - This is important as this is what leads to close() called twice from:
            - 
[shouldRollToNewFile](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L257)
 --> 
[closeCurrent](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L288)
            - 
[close](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L309)
   
   - configure pipeline to use S3FileIO and file size according to your test 
data so that the file will roll to new file 
   - S3 failure on putObject(should be reproducible for MultiPartUpload as 
well) call to  
[shouldRollToNewFile](https://github.com/apache/iceberg/blob/bf6242fe57605a7b38b9d01ee33ae325687fb3a5/core/src/main/java/org/apache/iceberg/io/BaseTaskWriter.java#L257)
 which calls [close --> 
completeUploads](https://github.com/apache/iceberg/blob/c8f93dfad1925c7192d2d01bbfd8b790b364b54e/aws/src/main/java/org/apache/iceberg/aws/s3/S3OutputStream.java#L242)
 
   - Pipeline should keep running even on above failure, then [snapshot 
barrier](https://github.com/apache/iceberg/blob/3584c79022ec70f79b326550736b4600d249e4a2/flink/v1.15/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java#L65)
 gets triggered 
     - This calls close and ends up adding the datafile which was never 
uploaded to S3
   
   **Testing**
   - Unit tests added
   - Testing on our dev pipeline, will update the results after the pipeline 
runs for a little bit
      


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to