Here's another resource for write-audit-publish, an example notebook from
Robin Moffatt:
https://github.com/tabular-io/docker-spark-iceberg/blob/main/spark/notebooks/Iceberg%20-%20Write-Audit-Publish%20(WAP)%20with%20Branches.ipynb

I don't think that's going to quite cover what Nirav was asking though. It
looks like the desired behavior is either to deduplicate records as they
are written, or to ensure that each write happens exactly once. The Spark
streaming Iceberg write path implements exactly-once semantics, so you
should just be able to run your write operation.

If what you're looking for is row-level deduplication, I'd recommend the
MERGE strategy. An INSERT to Iceberg is always an append that doesn't incur
unexpected additional processing. It is more clear to use a MERGE to do
what you're trying to accomplish, and MERGE is also very flexible.

Ryan

On Thu, Jul 20, 2023 at 1:45 PM Christina Larsen <
christinalarsen...@gmail.com> wrote:

> Iceberg supports the write-audit-publish workflow that does essentially
> what you want at a batch level, either with individual snapshots or
> branches, but the former at least is not well documented by Iceberg at
> present. I don't have bandwidth to do a more detailed write-up on it at
> present but some helpful resources are:
>
> https://github.com/apache/iceberg/blob/master/core/src/test/java/org/apache/iceberg/TestWapWorkflow.java
> https://iceberg.apache.org/docs/latest/branching/
> https://tabular.io/blog/integrated-audits/
>
> https://www.dremio.com/blog/streamlining-data-quality-in-apache-iceberg-with-write-audit-publish-branching/
>
> https://www.dremio.com/wp-content/uploads/2022/05/Sam-Redai-The-Write-Audit-Publish-Pattern-via-Apache-Iceberg.pdf
>
>
> On Thu, Jul 20, 2023 at 12:34 PM Nirav Patel <nira...@gmail.com> wrote:
>
>> Hi,
>>
>> I'm using spark structured streaming to append to iceberg partitioned
>> table. I am using custom iceberg catalog (gCP biglake iceberg catalog) to
>> upsert data into iceberg tables that are backed by gcp biglake metastore.
>>
>> There are multiple ways to append streaming data into partition table.
>> One that is mentioned in iceberg doc doesn't work as expected . (It could
>> be catalog impl issue)
>>
>> Following overwrites some of the records in parquet file when there are
>> multiple records ingested in different batches that belongs to same
>> partition.
>>
>> val tableIdentifier: String = ...
>> data.writeStream
>>     .format("iceberg")
>>     .outputMode("append")
>>     .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
>>     .option("path", tableIdentifier)
>>     .option("fanout-enabled", "true")
>>     .option("checkpointLocation", checkpointPath)
>>     .start()
>>
>> Does above option ensure exactly once in case reprocessing happens? ALso
>> it will not work for idempotent updates, right?
>>
>> My workaround for data issue caused by above is to use custom
>> foreachBatch function that does batch upserts using merge into query:
>>
>> e.g MERGE INTO logs
>> USING newDedupedLogs
>> ON logs.uniqueId = newDedupedLogs.uniqueId
>> WHEN NOT MATCHED
>>   THEN INSERT *
>>
>> so even foreachBatch is at-least once gaurantee `merge into` will never
>> insert duplicate records. However cost of write could be higher now? Is
>> there any other option with spark streaming + iceberg to do dedup and
>> idempotent writes? (in events of reprocessing or just duplicate records0
>>
>> I see Delta table have some options "txnVersion" and "txnAppId" which
>> allow it to drop duplicates before writing like following.
>>
>> def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
>>   batch_df.write.format(...).option("txnVersion",
>> batch_id).option("txnAppId", app_id).save(...) # location 1
>>
>> Is there something similar exist for Iceberg? If not do you see issue
>> with `foreach` and `merg into.. when not matched..` approach at production
>> scale.
>>
>> I have posted a question on SO regarding this as well:
>>
>> https://stackoverflow.com/questions/76726225/spark-structured-streaming-apache-iceberg-how-appends-can-be-idempotent
>>
>> Thanks!
>> Nirav
>>
>>

-- 
Ryan Blue
Tabular

Reply via email to