thesquelched opened a new issue #2131:
URL: https://github.com/apache/iceberg/issues/2131
It's possible to use the API to construct snapshots in such a way that
expiring snapshots (with file deletion enabled) causes active data files to be
deleted. This happens with an iceberg table that's manually managed over raw
parquet files written by spark (doesn't really bear going into why). The basic
steps are:
1. Create a partitioned iceberg table
2. Write two partitions (`p1` and `p2`) as raw parquet data via spark
3. Append files to iceberg table
4. *IMPORTANT* Commit iceberg overwrite that
1. Deletes files appended in step 3
2. Re-adds those same files
5. Expire snapshot 1 with file deletion enabled
6. Write raw parquet data to a new directory containing data for partitions
`p2` and `p3` (note that `p2` is the same partition as in step 2)
7. Commit iceberg overwrite that
1. Deletes files in snapshot 2 from partition `p2`
2. Adds all new files from step 6
8. Expire snapshot 2 with file deletion enabled
9. Reading the iceberg table now fails because the files from `p1`, which
are still active files, were deleted by the snapshot expiration in step 8
Here's a script that shows how to reproduce:
```scala
// vim: set ts=2 sw=2 bs=2 et
// Use Vimux bindings to evaluate expressions in the shell pane. For example:
//
// * In normal mode, use \vs to evaluate the block under the cursor
// * In (block) visual mode, use \vs to evaluate the highlighted block
import java.sql.{Date,Timestamp}
import java.time.LocalDate
import org.apache.hadoop.fs._
import org.apache.spark.sql._
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import scala.collection.JavaConversions._
import spark.implicits._
import org.apache.iceberg.hadoop.{HadoopTables,HadoopInputFile}
import org.apache.iceberg.spark.SparkSchemaUtil
import org.apache.iceberg.parquet.ParquetUtil
import org.apache.iceberg.{PartitionSpec,DataFiles,MetricsConfig}
val tableDir = "hdfs:///tmp/iceberg-table"
val fs = new Path(tableDir).getFileSystem(sc.hadoopConfiguration)
fs.delete(new Path(tableDir), true)
val tables = new HadoopTables(sc.hadoopConfiguration)
// Create simple table partitioned by day
val df1 = (
spark
.range(10L)
.select(
'id,
concat(lit("value"), 'id) as 'value,
when('id < 5L, Timestamp.valueOf("2021-01-19
00:00:00")).otherwise(Timestamp.valueOf("2021-01-20 00:00:00")) as 'ts
)
)
val schema = SparkSchemaUtil.convert(df1.schema)
val table = tables.create(
schema,
PartitionSpec.builderFor(schema).day("ts").build,
tableDir
)
// Get data files from a path
def getDataFiles(path: String) = {
fs
.globStatus(new Path(path, "*/*.parquet"))
.map({ status => HadoopInputFile.fromStatus(status,
sc.hadoopConfiguration) })
.map({ inputFile =>
DataFiles
.builder(table.spec)
.withInputFile(inputFile)
.withMetrics(ParquetUtil.fileMetrics(inputFile,
MetricsConfig.getDefault))
.withPartitionPath(new Path(inputFile.location).getParent.getName)
.build
})
.toSeq
}
// Write dataframe as raw parquet
(
df1
.withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
.repartition(2)
.sortWithinPartitions('ts)
.write
.partitionBy("ts_day")
.mode("overwrite")
.parquet(s"$tableDir/data/commit1")
)
// Append data files to iceberg table
val dataFiles = getDataFiles(s"$tableDir/data/commit1")
val append = table.newFastAppend
dataFiles.foreach(append.appendFile)
append.commit
table.refresh
// Table data appears OK
spark.read.format("iceberg").load(tableDir).show
// Issue an overwrite in which the appended datafiles are deleted, then
re-added
val overwrite = table.newOverwrite
dataFiles.foreach(overwrite.deleteFile)
dataFiles.foreach(overwrite.addFile)
overwrite.commit
table.refresh
// Table data appears OK
spark.read.format("iceberg").load(tableDir).show
// Expire first snapshot (append) with file cleanup enabled
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
table.refresh
// Table data appears OK
spark.read.format("iceberg").load(tableDir).show
// Write new parquet data, with one new (2021-01-21) and one overwritten
(2021-01-20) partition
(
spark
.range(5L, 15L)
.select(
'id,
concat(lit("value"), 'id) as 'value,
when('id < 10L, Timestamp.valueOf("2021-01-20
00:00:00")).otherwise(Timestamp.valueOf("2021-01-21 00:00:00")) as 'ts
)
.withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
.repartition(2)
.sortWithinPartitions('ts)
.write
.partitionBy("ts_day")
.mode("overwrite")
.parquet(s"$tableDir/data/commit2")
)
// Do an overwrite that deletes the old files from the overwritten partition
// (2021-01-20) and adds the fiels we just wrote for the overwritten and new
// partitions
//
val dataFiles2 = getDataFiles(s"$tableDir/data/commit2")
val overwrite = table.newOverwrite
dataFiles.filter({ file => LocalDate.ofEpochDay(file.partition.get(0,
classOf[Integer]).toLong) == LocalDate.of(2021, 1, 20)
}).foreach(overwrite.deleteFile)
dataFiles2.foreach(overwrite.addFile)
overwrite.commit
table.refresh
// Expire the second commit (the first overwrite)
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
table.refresh
// Throws an exception because the files from the original commit for the
// partition 2021-01-19 have been deleted, even though they were not affected
// by the most recent overwrite
spark.read.format("iceberg").load(tableDir).show
```
Clearly there's user error here (we shouldn't be deleting and re-adding the
same files added in the previous snapshot), but it feels like iceberg is doing
the wrong thing as well, as it deletes files that it still considers active. It
feels like the right solution is either to:
1. Reject the commit in step 4 with an exception
2. Warn the user that they're trying to both add and delete the same files
and silently remove the affected files from the delete list
3. Detect during the expiration that the files to be deleted are still
active and prevent them from getting deleted
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]