prodeezy commented on issue #4666:
URL: https://github.com/apache/iceberg/issues/4666#issuecomment-1112835421
Code To repro:
Step 1: Add a throw `new RuntimeException("FATAL")` anywhere in
https://github.com/apache/iceberg/blob/master/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L328-L346
Step 2:
Run this in one process:
```
public class TestDataLossReproCase {
private static final Configuration CONF = new Configuration();
private final String format = "parquet";
private static SparkSession spark = null;
@Rule
public TemporaryFolder temp = new TemporaryFolder();
@BeforeClass
public static void startSpark() {
TestDataLossReproCase.spark =
SparkSession.builder().master("local[2]").getOrCreate();
}
@AfterClass
public static void stopSpark() {
SparkSession currentSpark = TestDataLossReproCase.spark;
TestDataLossReproCase.spark = null;
currentSpark.stop();
}
public TestDataLossReproCase() { }
@Test
public void testWriteBatchWithFailure() throws IOException {
File parent = temp.newFolder(format);
File location = new File(parent, "test-oom");
HadoopTables tables = new HadoopTables(CONF);
Schema SCHEMA = new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get())
);
PartitionSpec spec =
PartitionSpec.builderFor(SCHEMA).identity("data").build();
Map tableProps = new HashMap<String, String>();
tableProps.put("write.format.default", "parquet");
tableProps.put(TableProperties.WRITE_AUDIT_PUBLISH_ENABLED, "true");
Table table = tables.create(SCHEMA, spec, tableProps,
location.toString());
List<SimpleRecord> records = Lists.newArrayList(
new SimpleRecord(1, "a"),
new SimpleRecord(2, "b"),
new SimpleRecord(3, "c")
);
Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
spark.conf().set("spark.wap.id", "batch1");
df.write()
.format("iceberg")
.mode("append")
.option("snapshot-property.marked-for-failure", "true")
.save(location.toString());
}
}
```
Step 3:
In a separate process / spark-shell try to cherry pick the same snapshot
that failed. Iceberg allows this to be committed but it has not data files.
```
scala> val wapSnapshot = table.snapshots().iterator().next()
scala> table.manageSnapshots().cherrypick(wapSnapshot.snapshotId()).commit();
scala> table.currentSnapshot()
res7: org.apache.iceberg.Snapshot = BaseSnapshot{id=3129787574451672289,
timestamp_ms=1651198987172, operation=append,
summary={spark.app.id=local-1651198931798, wap.id=batch1,
marked-for-failure=true, added-data-files=3, added-records=3,
added-files-size=1896, changed-partition-count=3, total-records=3,
total-files-size=1896, total-data-files=3, total-delete-files=0,
total-position-deletes=0, total-equality-deletes=0},
manifest-list=/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/metadata/snap-3129787574451672289-1-c71dcc2e-f838-4cff-bd16-de212c8db0ca.avro,
schema-id=0}
```
Step 4:
Reading the table fails with FNF :
```
scala>
spark.read.format("iceberg").load("file:///var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom").show()
22/04/28 19:35:25 ERROR BaseDataReader: Error reading file:
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
org.apache.iceberg.exceptions.RuntimeIOException: Failed to get status for
file:
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
at
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:158)
at
org.apache.iceberg.hadoop.HadoopInputFile.getStat(HadoopInputFile.java:192)
at org.apache.iceberg.parquet.ParquetIO.file(ParquetIO.java:54)
at org.apache.iceberg.parquet.ReadConf.newReader(ReadConf.java:218)
at org.apache.iceberg.parquet.ReadConf.<init>(ReadConf.java:74)
at org.apache.iceberg.parquet.ParquetReader.init(ParquetReader.java:66)
at
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:77)
at
org.apache.iceberg.parquet.ParquetReader.iterator(ParquetReader.java:38)
at org.apache.iceberg.util.Filter.lambda$filter$0(Filter.java:35)
at
org.apache.iceberg.io.CloseableIterable$2.iterator(CloseableIterable.java:73)
at
org.apache.iceberg.spark.source.RowDataReader.open(RowDataReader.java:95)
at
org.apache.iceberg.spark.source.BaseDataReader.next(BaseDataReader.java:93)
at
org.apache.spark.sql.execution.datasources.v2.PartitionIterator.hasNext(DataSourceRDD.scala:79)
at
org.apache.spark.sql.execution.datasources.v2.MetricsIterator.hasNext(DataSourceRDD.scala:112)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
Source)
at
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at
org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:755)
at
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:345)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:898)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:898)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:131)
at
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File
/var/folders/lt/3n80bsn133j1x4z79ffkm_940000gn/T/junit5193631189286247995/parquet/test-oom/data/data=a/00000-0-901adea9-8e0c-49f1-9a3e-d90239db993e-00001.parquet
does not exist
at
org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:666)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:987)
at
org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:656)
at
org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:454)
at
org.apache.iceberg.hadoop.HadoopInputFile.lazyStat(HadoopInputFile.java:156)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]