prodeezy commented on issue #4666:
URL: https://github.com/apache/iceberg/issues/4666#issuecomment-1112835421

   
   Code To repro:
   
   Step 1: Add a throw `new RuntimeException("FATAL")` anywhere in 
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L328-L346
   
   Step 2: 
   Run this in one process: 
   ```
   public class TestDataLossReproCase {
       private static final Configuration CONF = new Configuration();
       private final String format = "parquet";
       private static SparkSession spark = null;
   
       @Rule
       public TemporaryFolder temp = new TemporaryFolder();
   
       @BeforeClass
       public static void startSpark() {
           TestDataLossReproCase.spark = 
SparkSession.builder().master("local[2]").getOrCreate();
       }
   
       @AfterClass
       public static void stopSpark() {
           SparkSession currentSpark = TestDataLossReproCase.spark;
           TestDataLossReproCase.spark = null;
           currentSpark.stop();
       }
   
       public TestDataLossReproCase() { }
   
       @Test
       public void testWriteBatchWithFailure() throws IOException {
           File parent = temp.newFolder(format);
           File location = new File(parent, "test-oom");
   
           HadoopTables tables = new HadoopTables(CONF);
           Schema SCHEMA = new Schema(
                   Types.NestedField.optional(1, "id", Types.IntegerType.get()),
                   Types.NestedField.optional(2, "data", Types.StringType.get())
           );
           PartitionSpec spec = 
PartitionSpec.builderFor(SCHEMA).identity("data").build();
           Map tableProps = new HashMap<String, String>();
           tableProps.put("write.format.default", "parquet");
           tableProps.put(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true");
           Table table = tables.create(SCHEMA, spec, tableProps, 
location.toString());
   
           List<SimpleRecord> records = Lists.newArrayList(
                   new SimpleRecord(1, "a"),
                   new SimpleRecord(2, "b"),
                   new SimpleRecord(3, "c")
           );
           Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
           spark.conf().set("spark.wap.id", "batch1");
           df.write()
                   .format("iceberg")
                   .mode("append")
                   .option("snapshot-property.marked-for-failure", "true")
                   .save(location.toString());
       }
   }
   ```
   
   Step 3: 
   In a separate process / spark-shell try to cherry pick the same snapshot 
that failed. Iceberg allows this to be committed but it has not data files. 
   
   ```
   scala> val wapSnapshot = table.snapshots().iterator().next()
   
   scala> table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit();
   
   scala> table.currentSnapshot()
   res7: org.apache.iceberg.Snapshot = BaseSnapshot{id=3129787574451672289, 
timestamp_ms=1651198987172, operation=append, 
summary={spark.app.id=local-1651198931798, wap.id=batch1, 
marked-for-failure=true, added-data-files=3, added-records=3, 
added-files-size=1896, changed-partition-count=3, total-records=3, 
total-files-size=1896, total-data-files=3, total-delete-files=0, 
total-position-deletes=0, total-equality-deletes=0}, 
manifest-list=/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/metadata/snap-3129787574451672289-1-c71dcc2e-f838-4cff-bd16-de212c8db0ca.avro,
 schema-id=0}
   
   ```
   
   Step 4:
   
   Reading the table fails with FNF : 
   ```
   
   scala> 
spark.read.format("iceberg").load("file:///var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom").show()
   
   22/04/28 19:35:25 ERROR BaseDataReader: Error reading file: 
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
   org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for 
file: 
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
        at 
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:158)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:192)
        at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54)
        at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
        at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
        at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
        at 
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
        at 
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
        at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
        at 
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
        at 
org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
        at 
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:93)
        at 
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
        at 
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
        at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
        at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
        at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
        at org.apache.spark.scheduler.Task.run(Task.scala:131)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
   Caused by: java.io.FileNotFoundException: File 
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
 does not exist
        at 
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:666)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656)
        at 
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
        at 
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:156)
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to