RussellSpitzer commented on issue #2131:
URL: https://github.com/apache/iceberg/issues/2131#issuecomment-764830749


   Have you tried the ExpireSnapshotsAction it has a slightly different 
internal implementation so I think it actually will preserve the file even with 
the procedure. But I think your concerns are all very valid.
   
   > On Jan 21, 2021, at 11:56 AM, Scott Kruger <[email protected]> 
wrote:
   > 
   > 
   > It's possible to use the API to construct snapshots in such a way that 
expiring snapshots (with file deletion enabled) causes active data files to be 
deleted. This happens with an iceberg table that's manually managed over raw 
parquet files written by spark (doesn't really bear going into why). The basic 
steps are:
   > 
   > Create a partitioned iceberg table
   > Write two partitions (p1 and p2) as raw parquet data via spark
   > Append files to iceberg table
   > IMPORTANT Commit iceberg overwrite that
   > Deletes files appended in step 3
   > Re-adds those same files
   > Expire snapshot 1 with file deletion enabled
   > Write raw parquet data to a new directory containing data for partitions 
p2 and p3 (note that p2 is the same partition as in step 2)
   > Commit iceberg overwrite that
   > Deletes files in snapshot 2 from partition p2
   > Adds all new files from step 6
   > Expire snapshot 2 with file deletion enabled
   > Reading the iceberg table now fails because the files from p1, which are 
still active files, were deleted by the snapshot expiration in step 8
   > Here's a script that shows how to reproduce:
   > 
   > // vim: set ts=2 sw=2 bs=2 et
   > 
   > // Use Vimux bindings to evaluate expressions in the shell pane. For 
example:
   > //
   > // * In normal mode, use \vs to evaluate the block under the cursor
   > // * In (block) visual mode, use \vs to evaluate the highlighted block
   > 
   > import java.sql.{Date,Timestamp}
   > import java.time.LocalDate
   > import org.apache.hadoop.fs._
   > import org.apache.spark.sql._
   > import org.apache.spark.sql.types._
   > import org.apache.spark.sql.functions._
   > import org.apache.spark.sql.expressions._
   > import scala.collection.JavaConversions._
   > import spark.implicits._
   > import org.apache.iceberg.hadoop.{HadoopTables,HadoopInputFile}
   > import org.apache.iceberg.spark.SparkSchemaUtil
   > import org.apache.iceberg.parquet.ParquetUtil
   > import org.apache.iceberg.{PartitionSpec,DataFiles,MetricsConfig}
   > 
   > val tableDir = "hdfs:///tmp/iceberg-table"
   > 
   > val fs = new Path(tableDir).getFileSystem(sc.hadoopConfiguration)
   > fs.delete(new Path(tableDir), true)
   > 
   > val tables = new HadoopTables(sc.hadoopConfiguration)
   > 
   > // Create simple table partitioned by day
   > val df1 = (
   >   spark
   >     .range(10L)
   >     .select(
   >       'id,
   >       concat(lit("value"), 'id) as 'value,
   >       when('id < 5L, Timestamp.valueOf("2021-01-19 
00:00:00")).otherwise(Timestamp.valueOf("2021-01-20 00:00:00")) as 'ts
   >     )
   > )
   > 
   > val schema = SparkSchemaUtil.convert(df1.schema)
   > val table = tables.create(
   >   schema,
   >   PartitionSpec.builderFor(schema).day("ts").build,
   >   tableDir
   > )
   > 
   > // Get data files from a path
   > def getDataFiles(path: String) = {
   >   fs
   >     .globStatus(new Path(path, "*/*.parquet"))
   >     .map({ status => HadoopInputFile.fromStatus(status, 
sc.hadoopConfiguration) })
   >     .map({ inputFile =>
   >       DataFiles
   >         .builder(table.spec)
   >         .withInputFile(inputFile)
   >         .withMetrics(ParquetUtil.fileMetrics(inputFile, 
MetricsConfig.getDefault))
   >         .withPartitionPath(new Path(inputFile.location).getParent.getName)
   >         .build
   >     })
   >     .toSeq
   > }
   > 
   > // Write dataframe as raw parquet
   > (
   >   df1
   >     .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
   >     .repartition(2)
   >     .sortWithinPartitions('ts)
   >     .write
   >     .partitionBy("ts_day")
   >     .mode("overwrite")
   >     .parquet(s"$tableDir/data/commit1")
   > )
   > 
   > // Append data files to iceberg table
   > val dataFiles = getDataFiles(s"$tableDir/data/commit1")
   > 
   > val append = table.newFastAppend
   > dataFiles.foreach(append.appendFile)
   > append.commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Issue an overwrite in which the appended datafiles are deleted, then 
re-added
   > val overwrite = table.newOverwrite
   > dataFiles.foreach(overwrite.deleteFile)
   > dataFiles.foreach(overwrite.addFile)
   > overwrite.commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Expire first snapshot (append) with file cleanup enabled
   > 
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   > table.refresh
   > 
   > // Table data appears OK
   > spark.read.format("iceberg").load(tableDir).show
   > 
   > // Write new parquet data, with one new (2021-01-21) and one overwritten 
(2021-01-20) partition
   > (
   >   spark
   >     .range(5L, 15L)
   >     .select(
   >       'id,
   >       concat(lit("value"), 'id) as 'value,
   >       when('id < 10L, Timestamp.valueOf("2021-01-20 
00:00:00")).otherwise(Timestamp.valueOf("2021-01-21 00:00:00")) as 'ts
   >     )
   >     .withColumn("ts_day", date_format('ts, "yyyy-MM-dd"))
   >     .repartition(2)
   >     .sortWithinPartitions('ts)
   >     .write
   >     .partitionBy("ts_day")
   >     .mode("overwrite")
   >     .parquet(s"$tableDir/data/commit2")
   > )
   > 
   > // Do an overwrite that deletes the old files from the overwritten 
partition
   > // (2021-01-20) and adds the fiels we just wrote for the overwritten and 
new
   > // partitions
   > //
   > val dataFiles2 = getDataFiles(s"$tableDir/data/commit2")
   > 
   > val overwrite = table.newOverwrite
   > dataFiles.filter({ file => LocalDate.ofEpochDay(file.partition.get(0, 
classOf[Integer]).toLong) == LocalDate.of(2021, 1, 20) 
}).foreach(overwrite.deleteFile)
   > dataFiles2.foreach(overwrite.addFile)
   > overwrite.commit
   > table.refresh
   > 
   > // Expire the second commit (the first overwrite)
   > 
table.expireSnapshots.expireSnapshotId(table.snapshots.head.snapshotId).cleanExpiredFiles(true).commit
   > table.refresh
   > 
   > // Throws an exception because the files from the original commit for the
   > // partition 2021-01-19 have been deleted, even though they were not 
affected
   > // by the most recent overwrite
   > spark.read.format("iceberg").load(tableDir).show
   > Clearly there's user error here (we shouldn't be deleting and re-adding 
the same files added in the previous snapshot), but it feels like iceberg is 
doing the wrong thing as well, as it deletes files that it still considers 
active. It feels like the right solution is either to:
   > 
   > Reject the commit in step 4 with an exception
   > Warn the user that they're trying to both add and delete the same files 
and silently remove the affected files from the delete list
   > Detect during the expiration that the files to be deleted are still active 
and prevent them from getting deleted
   > —
   > You are receiving this because you are subscribed to this thread.
   > Reply to this email directly, view it on GitHub 
<https://github.com/apache/iceberg/issues/2131>, or unsubscribe 
<https://github.com/notifications/unsubscribe-auth/AADE2YPNOH7ARTMXPY2CDTTS3BTDTANCNFSM4WNIZNDQ>.
   > 
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to