Hi Lian, We've built most of this. I believe that it works to do this from Flink as long as you supply the columns that you want to use to identify rows for the upsert. We're also adding identity columns in table metadata so that you can do this through Flink SQL.
That should work, but I should also warn you that this uses the v2 format that we haven't yet finalized (coming soon). So you'd need to do a little work to update the table to v2 because we don't expose v2 publicly yet. It should be fine for testing, though. Here's our internal test where we configure the sink: https://github.com/apache/iceberg/blob/master/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSinkV2.java#L155-L160 I hope that helps! We're working on finalizing v2 so we can recommend it for this use case in production soon. Ryan On Tue, May 4, 2021 at 8:43 PM Lian Jiang <jiangok2...@gmail.com> wrote: > Hi, > > I want to dump events in a kafka topic into datalake as a type 1 snapshot > in iceberg. Type 1 means a record having a key will overwrite the previous > record having the same key. Each key will have only one record in the > snapshot. > > Note that I want to simplify the long path: > kafka -> (streaming job) -> type 2 historical -> (spark job) -> type 1 > snapshot > TO the short path: > kafka -> (streaming job) -> type 1 snapshot. > > The short path requires the streaming job directly update the type 1 > snapshot using row level update (e.g. merge into). The streaming job can be > either spark structured streaming (SSS) or flink. > > According to https://iceberg.apache.org/spark-structured-streaming/, SSS > does not support "merge into". > > According to https://iceberg.apache.org/flink/#insert-into, Flink > streaming job does not support "merge into" or "insert into". > > Is this a dead end? Appreciate any hints. > > > > > -- Ryan Blue Tabular