RussellSpitzer opened a new issue, #4686:
URL: https://github.com/apache/iceberg/issues/4686

   The issue here was pointed out by @stevenzwu when we were discussing the 
issue with incorrect aborts when non-runtime exceptions were thrown. He 
correctly noticed that the same sort of issue would appear if we threw a commit 
unknown exception and the Spark Writer aborted and deleted underlying data 
files.
   
   cc : @stevenzwu @rdblue @aokolnychyi @flyrain
   
   ## Reproduction
   
   I tried for a while to see if I could mock this but I was unable to, I'm 
trying 
   
   Modify 
https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L181
   
   And add as the last line of this method
   ```java
       throw new CommitFailedException("Fun failure");
   ```
   
   This means that our metadata will be properly commit and data files written.
   
   Following this I added a small test
   
   ```java
     @Test
     public void testCommitUnknownException() throws IOException {
       File parent = temp.newFolder(format.toString());
       File location = new File(parent, "commitunknown");
   
       HadoopTables tables = new HadoopTables(CONF);
       PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("data").build();
       Table table = tables.create(SCHEMA, spec, location.toString());
   
       List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000);
       for (int i = 0; i < 2000; i++) {
         expected.add(new SimpleRecord(i, "a"));
         expected.add(new SimpleRecord(i, "b"));
         expected.add(new SimpleRecord(i, "c"));
         expected.add(new SimpleRecord(i, "d"));
       }
   
       Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class);
   
       SparkTable sparkTable = spy(new SparkTable(table, true));
   
       try {
         df.select("id", "data").sort("data").write()
           .format("iceberg")
           .option(SparkWriteOptions.WRITE_FORMAT, format.toString())
           .mode(SaveMode.Append)
           .save(location.toString());
       } catch (Throwable t) {
         System.out.println("Error Expected");
       }
   
       long count = 
spark.read().format("iceberg").load(location.toString()).count();
       Assert.assertEquals(count, 8000);
     }
   ```
   
   This test will fail because of missing data files
   
   ```java
   [Executor task launch worker for task 0.0 in stage 4.0 (TID 5)] ERROR 
org.apache.iceberg.spark.source.BaseDataReader - Error reading file: 
/var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
   org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for 
file: 
/var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
        at 
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:161)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:195)
        at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54)
        at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
        at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
        at 
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
        at 
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
        at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
        at 
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
        at 
org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:77)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:107)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93)
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown
 Source)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:834)
   Caused by: java.io.FileNotFoundException: File 
/var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet
 does not exist
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:159)
        ... 34 more
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to