Hi all, I have a non-iceberg Spark streaming process that I'm trying
to re-engineer, and am running into some trouble making it happen
using Iceberg.  I think I'm using a fairly common pattern, so I wonder
if someone here can give me a tip on how to go about it.  I'll try to
be concise but give enough detail to convey the problem:

It's a spark streaming app that takes micro-batches of data from a
kafka topic, does some transformation on it, sorts the in-flight data,
 and then writes the micro-batch out into an existing ORC "table" (not
a hive table, just an HDFS directory that contains all of the
partitions).  The table is partitioned by date+hour, and within each
partition it's ordered by a string field.  There is a constant stream
of incoming data, and it's a whole lot of volume, so micro-batches are
being processed frequently, each of which creates an additional set of
ORC files within the table.  This results in lots of files being
created, way more than is optimal, so after a time, when all the data
for an hour has finally been written out, a separate job
"compacts"/coalesces that hour of data (in other words, it gets
re-written to a smaller number of ORC files).

Why do it this way?
* Data is available for search/analysis almost immediately. All the
previous hours of data, having been compacted, are well optimized, and
having one poorly optimized hour is fine in trade for being able to
access the most recent data too.
* Writing many smaller ORC files for the current hour allows each file
to keep the correct ordering, which turns out to be important:  Using
ORC's bloom filters (AKA "light indexes") in combination with the
sorted data vastly improves search performance.

The major pain point is "compaction": because that process rewrites
the hour of data and then replaces the existing files, it will break
any already running analyses that happen to need rows from that hour.
I want to refactor to use Iceberg so that I can seamlessly do those
compactions thanks to snapshots.

What I *think* I need is a way to get Iceberg to create new files
within the table for each micro-batch.  At first I thought that
perhaps the SparkPartitionedFanoutWriter might be the right tool, but
(a) it doesn't seem to support ORC, and (b), if I'm reading it right,
it wants to use a size threshold to decide when to write to additional
files, which isn't what I need.  Is there a simple answer here, or
would I need a new feature in Iceberg to support this use case?  Or
maybe this is an outdated pattern, and I should be doing it a
different way?

Thank you for bearing with me.  Any suggestions are appreciated.

- Peter

Reply via email to