RussellSpitzer opened a new issue, #4686: URL: https://github.com/apache/iceberg/issues/4686
The issue here was pointed out by @stevenzwu when we were discussing the issue with incorrect aborts when non-runtime exceptions were thrown. He correctly noticed that the same sort of issue would appear if we threw a commit unknown exception and the Spark Writer aborted and deleted underlying data files. cc : @stevenzwu @rdblue @aokolnychyi @flyrain ## Reproduction I tried for a while to see if I could mock this but I was unable to, I'm trying Modify https://github.com/apache/iceberg/blob/f6e11148d31b408a7aea57a0efcb4428134f6a99/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java#L181 And add as the last line of this method ```java throw new CommitFailedException("Fun failure"); ``` This means that our metadata will be properly commit and data files written. Following this I added a small test ```java @Test public void testCommitUnknownException() throws IOException { File parent = temp.newFolder(format.toString()); File location = new File(parent, "commitunknown"); HadoopTables tables = new HadoopTables(CONF); PartitionSpec spec = PartitionSpec.builderFor(SCHEMA).identity("data").build(); Table table = tables.create(SCHEMA, spec, location.toString()); List<SimpleRecord> expected = Lists.newArrayListWithCapacity(8000); for (int i = 0; i < 2000; i++) { expected.add(new SimpleRecord(i, "a")); expected.add(new SimpleRecord(i, "b")); expected.add(new SimpleRecord(i, "c")); expected.add(new SimpleRecord(i, "d")); } Dataset<Row> df = spark.createDataFrame(expected, SimpleRecord.class); SparkTable sparkTable = spy(new SparkTable(table, true)); try { df.select("id", "data").sort("data").write() .format("iceberg") .option(SparkWriteOptions.WRITE_FORMAT, format.toString()) .mode(SaveMode.Append) .save(location.toString()); } catch (Throwable t) { System.out.println("Error Expected"); } long count = spark.read().format("iceberg").load(location.toString()).count(); Assert.assertEquals(count, 8000); } ``` This test will fail because of missing data files ```java [Executor task launch worker for task 0.0 in stage 4.0 (TID 5)] ERROR org.apache.iceberg.spark.source.BaseDataReader - Error reading file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for file: /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:161) at org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:195) at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54) at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218) at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74) at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66) at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77) at org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38) at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35) at org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73) at org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:77) at org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:107) at org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:93) at org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:130) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.agg_doAggregateWithoutKey_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:349) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373) at org.apache.spark.rdd.RDD.iterator(RDD.scala:337) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:131) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.io.FileNotFoundException: File /var/folders/yl/6cwgks7919s1td2mfdq86cbm0000gn/T/junit11371694369070946472/PARQUET/commitunknown/data/data=a/00000-4-66cc0b01-7df4-4f52-bfe0-21af515cdf0b-00001.parquet does not exist at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611) at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824) at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601) at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421) at org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:159) ... 34 more ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
