thesquelched opened a new issue #2131:
URL: https://github.com/apache/iceberg/issues/2131


   It's possible to use the API to construct snapshots in such a way that 
expiring snapshots (with file deletion enabled) causes active data files to be 
deleted. This happens with an iceberg table that's manually managed over raw 
parquet files written by spark (doesn't really bear going into why). The basic 
steps are:
   
   1. Create a partitioned iceberg table
   2. Write two partitions (`p1` and `p2`) as raw parquet data via spark
   3. Append files to iceberg table
   4. *IMPORTANT* Commit iceberg overwrite that
      1. Deletes files appended in step 3
      2. Re-adds those same files
   5. Expire snapshot 1 with file deletion enabled
   6. Write raw parquet data to a new directory containing data for partitions 
`p2` and `p3` (note that `p2` is the same partition as in step 2)
   7. Commit iceberg overwrite that
      1. Deletes files in snapshot 2 from partition `p2`
      2. Adds all new files from step 6
   8. Expire snapshot 2 with file deletion enabled
   9. Reading the iceberg table now fails because the files from `p1`, which 
are still active files, were deleted by the snapshot expiration in step 8
   
   Here's a script that shows how to reproduce:
   
   ```scala
   // vim: set ts=2 sw=2 bs=2 et
   
   // Use Vimux bindings to evaluate expressions in the shell pane. For example:
   //
   // * In normal mode, use \vs to evaluate the block under the cursor
   // * In (block) visual mode, use \vs to evaluate the highlighted block
   
   import java.sql.{Date,Timestamp}
   import java.time.LocalDate
   import org.apache.hadoop.fs._
   import org.apache.spark.sql._
   import org.apache.spark.sql.types._
   import org.apache.spark.sql.functions._
   import org.apache.spark.sql.expressions._
   import scala.collection.JavaConversions._
   import spark.implicits._
   import org.apache.iceberg.hadoop.{HadoopTables,HadoopInputFile}
   import org.apache.iceberg.spark.SparkSchemaUtil
   import org.apache.iceberg.parquet.ParquetUtil
   import org.apache.iceberg.{PartitionSpec,DataFiles,MetricsConfig}
   
   val tableDir = "hdfs:///tmp/iceberg-table"
   
   val fs = new Path(tableDir).getFileSystem(sc.hadoopConfiguration)
   fs.delete(new Path(tableDir), true)
   
   val tables = new HadoopTables(sc.hadoopConfiguration)
   
   // Create simple table partitioned by day
   val df1 = (
     spark
       .range(10L)
       .select(
         'id,
         concat(lit("value"), 'id) as 'value,
         when('id < 5L, Timestamp.valueOf("2021-01-19 
00:00:00")).otherwise(Timestamp.valueOf("2021-01-20 00:00:00")) as 'ts
       )
   )
   
   val schema = SparkSchemaUtil.convert(df1.schema)
   val table = tables.create(
     schema,
     PartitionSpec.builderFor(schema).day("ts").build,
     tableDir
   )
   
   // Get data files from a path
   def getDataFiles(path: String) = {
     fs
       .globStatus(new Path(path, "*/*.parquet"))
       .map({ status => HadoopInputFile.fromStatus(status, 
sc.hadoopConfiguration) })
       .map({ inputFile =>
         DataFiles
           .builder(table.spec)
           .withInputFile(inputFile)
           .withMetrics(ParquetUtil.fileMetrics(inputFile, 
MetricsConfig.getDefault))
           .withPartitionPath(new Path(inputFile.location).getParent.getName)
           .build
       })
       .toSeq
   }
   
   // Write dataframe as raw parquet
   (
     df1
       .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
       .repartition(2)
       .sortWithinPartitions('ts)
       .write
       .partitionBy("ts_day")
       .mode("overwrite")
       .parquet(s"$tableDir/data/commit1")
   )
   
   // Append data files to iceberg table
   val dataFiles = getDataFiles(s"$tableDir/data/commit1")
   
   val append = table.newFastAppend
   dataFiles.foreach(append.appendFile)
   append.commit
   table.refresh
   
   // Table data appears OK
   spark.read.format("iceberg").load(tableDir).show
   
   // Issue an overwrite in which the appended datafiles are deleted, then 
re-added
   val overwrite = table.newOverwrite
   dataFiles.foreach(overwrite.deleteFile)
   dataFiles.foreach(overwrite.addFile)
   overwrite.commit
   table.refresh
   
   // Table data appears OK
   spark.read.format("iceberg").load(tableDir).show
   
   // Expire first snapshot (append) with file cleanup enabled
   
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   table.refresh
   
   // Table data appears OK
   spark.read.format("iceberg").load(tableDir).show
   
   // Write new parquet data, with one new (2021-01-21) and one overwritten 
(2021-01-20) partition
   (
     spark
       .range(5L, 15L)
       .select(
         'id,
         concat(lit("value"), 'id) as 'value,
         when('id < 10L, Timestamp.valueOf("2021-01-20 
00:00:00")).otherwise(Timestamp.valueOf("2021-01-21 00:00:00")) as 'ts
       )
       .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
       .repartition(2)
       .sortWithinPartitions('ts)
       .write
       .partitionBy("ts_day")
       .mode("overwrite")
       .parquet(s"$tableDir/data/commit2")
   )
   
   // Do an overwrite that deletes the old files from the overwritten partition
   // (2021-01-20) and adds the fiels we just wrote for the overwritten and new
   // partitions
   //
   val dataFiles2 = getDataFiles(s"$tableDir/data/commit2")
   
   val overwrite = table.newOverwrite
   dataFiles.filter({ file => LocalDate.ofEpochDay(file.partition.get(0, 
classOf[Integer]).toLong) == LocalDate.of(2021, 1, 20) 
}).foreach(overwrite.deleteFile)
   dataFiles2.foreach(overwrite.addFile)
   overwrite.commit
   table.refresh
   
   // Expire the second commit (the first overwrite)
   
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   table.refresh
   
   // Throws an exception because the files from the original commit for the
   // partition 2021-01-19 have been deleted, even though they were not affected
   // by the most recent overwrite
   spark.read.format("iceberg").load(tableDir).show
   ```
   
   Clearly there's user error here (we shouldn't be deleting and re-adding the 
same files added in the previous snapshot), but it feels like iceberg is doing 
the wrong thing as well, as it deletes files that it still considers active. It 
feels like the right solution is either to:
   
   1. Reject the commit in step 4 with an exception
   2. Warn the user that they're trying to both add and delete the same files 
and silently remove the affected files from the delete list
   3. Detect during the expiration that the files to be deleted are still 
active and prevent them from getting deleted


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to