Noted, I was calling it a snapshot since that's what the prototype constructs to pass it down to file-planning API, but that's just an implementation detail. We could add an appendsBetween(s1, s2) API, but I wanted to keep the Original Scan API separate from Incremental scan as the original scan is tied to a single Snapshot Deletes/overwrites at a file level should work, I think I have a few testcases in the prototype which tests this - https://github.com/apache/incubator-iceberg/pull/315
On Fri, Jul 26, 2019 at 10:07 AM Ryan Blue <rb...@netflix.com> wrote: > Thanks for working on this! > > I think the overall idea of being able to plan an incremental scan is a > good idea. But, we should avoid calling the incremental data a “snapshot”. > A snapshot is the table state at some point in time, and I think it would > be confusing if we started adding new meanings. > > Is there a reason to make a new incremental scan instead of updating the > current interface to select just changes? I’m wondering if we could just > add a refinement method to TableScan instead, like appendsBetween(s1, s2). > > How do you propose to handle overwrites and deletes in this interface? > Should those be ignored? (Maybe the answer to this question is a good > reason to have a separate IncrementalScan API.) > > On Thu, Jul 25, 2019 at 4:03 PM RD <rdsr...@gmail.com> wrote: > >> Thanks Ryan, >> >> Iceberg can give you the data files that were added or deleted in a >>> snapshot, but there isn't a good way to take those and actually read them >>> as a DataFrame or select that data from a table in SQL. I'd think >>> that's a good first step >> >> >> One approach which I'm currently prototyping is this. >> >> I plan to expose an incrementalScan API as part of the Table API. The >> IncrementalScan would take start and end snapshot ids, using which it could >> construct a "diff" snapshot. This snapshot would essentially be composed of >> all the data files which were added since the start snapshot id. This diff >> snapshot would be a staging snapshot and would not be committed to the >> table and can be asynchronously deleted [after being read]. We can then >> read this snapshot using an existing planFiles method >> [org.apache.iceberg.DataTableScan#planFiles]. >> >> If you think this approach is workable, I can create an issue and discuss >> more there with the prototype. >> >> -Best. >> >> >> On Wed, Jul 17, 2019 at 12:55 PM Ryan Blue <rb...@netflix.com> wrote: >> >>> I think it would be helpful to have a pattern for incremental >>> processing. Iceberg can give you the data files that were added or deleted >>> in a snapshot, but there isn't a good way to take those and actually read >>> them as a DataFrame or select that data from a table in SQL. I'd think >>> that's a good first step, but I'd also like to work on other concerns, like >>> rollback and reprocessing. It would also be nice if such a system had a way >>> to block snapshot expiration until all downstream incremental processes >>> have completed. >>> >>> rb >>> >>> On Wed, Jul 17, 2019 at 12:46 PM RD <rdsr...@gmail.com> wrote: >>> >>>> Hi Iceberg devs, >>>> We are starting work on a somewhat similar project. The idea is that >>>> users can ask for incremental data since the last snapshot they processed, >>>> i.e the delta that was added since the last snapshot. Do you guys think >>>> that whether this can be a general feature that can we beneficial to >>>> Iceberg? >>>> >>>> -R >>>> >>>> On Wed, Jul 17, 2019 at 10:16 AM Ryan Blue <rb...@netflix.com.invalid> >>>> wrote: >>>> >>>>> You can do this using time-travel. First, read the table at each >>>>> snapshot. This creates a temporary table for both snapshots: >>>>> >>>>> // create temp tables for each snapshot >>>>> spark.read.format("iceberg").option("snapshot-id", >>>>> 8924558786060583479L).load("db.table").createOrReplaceTempTable("s1") >>>>> spark.read.format("iceberg").option("snapshot-id", >>>>> 6536733823181975045L).load("db.table").createOrReplaceTempTable("s2") >>>>> >>>>> Next, use a left-outer join and a filter to find the rows that are >>>>> only in the second snapshot: >>>>> >>>>> SELECT s2.* FROM (SELECT * FROM s2 LEFT OUTER JOIN s1 ON s1.id = s2.id) >>>>> WHERE s1.id IS NULL >>>>> >>>>> Note that this is an expensive operation if you don’t also filter the >>>>> snapshots. You probably want to do this with a small table, or you want to >>>>> use filters to cut down on the data loaded that might match. For example, >>>>> if you’re writing events as they come in from Kafka, you can filter each >>>>> snapshot using the last day of data, then run the left outer join. >>>>> >>>>> On Wed, Jul 17, 2019 at 9:50 AM aa bb <abcd1...@yahoo.com.invalid> >>>>> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Could you please advise how we can get delta data changes (diff) >>>>>> between 2 Snapshots? >>>>>> >>>>>> Is there any way providing 2 Snapshot Ids (8924558786060583479, >>>>>> 6536733823181975045) and get records that added after 8924558786060583479 >>>>>> ? >>>>>> >>>>>> +-------------------------+---------------------+---------------------+---------------------+ >>>>>> | made_current_at | snapshot_id | parent_id | >>>>>> is_current_ancestor | >>>>>> +-------------------------+---------------------+---------------------+---------------------+ >>>>>> | 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | >>>>>> true | >>>>>> | 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | >>>>>> true | >>>>>> +-------------------------+---------------------+---------------------+---------------------+ >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> Thanks, >>>>>> >>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix >