Hi all, I have a non-iceberg Spark streaming process that I'm trying to re-engineer, and am running into some trouble making it happen using Iceberg. I think I'm using a fairly common pattern, so I wonder if someone here can give me a tip on how to go about it. I'll try to be concise but give enough detail to convey the problem:
It's a spark streaming app that takes micro-batches of data from a kafka topic, does some transformation on it, sorts the in-flight data, and then writes the micro-batch out into an existing ORC "table" (not a hive table, just an HDFS directory that contains all of the partitions). The table is partitioned by date+hour, and within each partition it's ordered by a string field. There is a constant stream of incoming data, and it's a whole lot of volume, so micro-batches are being processed frequently, each of which creates an additional set of ORC files within the table. This results in lots of files being created, way more than is optimal, so after a time, when all the data for an hour has finally been written out, a separate job "compacts"/coalesces that hour of data (in other words, it gets re-written to a smaller number of ORC files). Why do it this way? * Data is available for search/analysis almost immediately. All the previous hours of data, having been compacted, are well optimized, and having one poorly optimized hour is fine in trade for being able to access the most recent data too. * Writing many smaller ORC files for the current hour allows each file to keep the correct ordering, which turns out to be important: Using ORC's bloom filters (AKA "light indexes") in combination with the sorted data vastly improves search performance. The major pain point is "compaction": because that process rewrites the hour of data and then replaces the existing files, it will break any already running analyses that happen to need rows from that hour. I want to refactor to use Iceberg so that I can seamlessly do those compactions thanks to snapshots. What I *think* I need is a way to get Iceberg to create new files within the table for each micro-batch. At first I thought that perhaps the SparkPartitionedFanoutWriter might be the right tool, but (a) it doesn't seem to support ORC, and (b), if I'm reading it right, it wants to use a size threshold to decide when to write to additional files, which isn't what I need. Is there a simple answer here, or would I need a new feature in Iceberg to support this use case? Or maybe this is an outdated pattern, and I should be doing it a different way? Thank you for bearing with me. Any suggestions are appreciated. - Peter