I find it useful to delineate two kinds of things 1. Mutations such as database table updates. These always have a key 2. Immutable events such as clicks, sales, orders, etc.
The whole premise of compaction is that you have some redundant updates as in case (1). In order to have updates you have to have some kind of identifier that says *what* is updated. If you don't have that then you have a true event stream (2) which logically can't be compacted because each event stands alone. In this later case (i.e. 2) the log and the snapshot are in fact the same thing so you get snapshots automatically. Does that make sense? I may be missing some aspect of your question... -Jay On Mon, Feb 23, 2015 at 11:37 AM, Thomas Bernhardt < bernhardt...@yahoo.com.invalid> wrote: > Assume that there is data that doesn't have a key, how would you handle > that? Would you always have a key and therefore generate one? > > Best regards,Tom > From: Felix GV <fville...@linkedin.com.INVALID> > To: "dev@samza.apache.org" <dev@samza.apache.org> > Sent: Monday, February 23, 2015 2:15 PM > Subject: RE: Re-processing a la Kappa/Liquid > > A recently-compacted topic is pretty similar to a snapshot in terms of > semantics, no? If you read the topic up until the point where the > compaction ended, you effectively read every key just once, same as a > snapshot. > > I agree that the guaranteed uncompacted/dirty retention period would be > useful. > > -- > > Felix GV > Data Infrastructure Engineer > Distributed Data Systems > LinkedIn > > f...@linkedin.com > linkedin.com/in/felixgv > > ________________________________________ > > > From: Roger Hoover [roger.hoo...@gmail.com] > Sent: Monday, February 23, 2015 10:33 AM > To: dev@samza.apache.org > Subject: Re: Re-processing a la Kappa/Liquid > > Thanks, Julian. > > I didn't see any mention of checkpoints in Kappa or Liquid information I've > read but it does seem like a very useful optimization to make re-processing > and failure recovery much faster. Databus supports snapshots, I believe, > so that DB replicates can be initialized in a practical amount of time. > > Interested to know of Jay, Chris, or others have thought about how > snapshots might fit with Kafka +/or Samza. If it something Kafka should > provide at some point or would it be layered on top? > > Cheers, > > Roger > > On Sun, Feb 22, 2015 at 1:13 AM, Julian Hyde <jul...@hydromatic.net> > wrote: > > > Can I quibble with semantics? > > > > This problem seems to be more naturally a stream-to-stream join, not a > > stream-to-table join. It seems unreasonable to expect the system to be > able > > to give you the state of a table at a given moment in the past, but it is > > reasonable ask for the stream up to that point. > > > > A stream and the archive of a table (its contents at every moment in the > > past) are equivalent in theory (they have exactly the same information > > content) but different in practice: (1) there are different costs to > access > > them (it is costly to re-create a table by re-playing a stream of its > > inserts), and (2) streams are managed internal to the system whereas > tables > > are external. For Roger's problem, (2) is a crucial difference. > > > > Then the question is how to throw information away but make it possible, > > and efficient, to answer the queries we will need to ask in future. > > > > A good way to do this is with checkpoints, replay, and retention. You > > periodically checkpoint the state of a table (or indeed any stateful > stream > > operator). To re-create the state of a operator at a particular time T > you > > start with the previous checkpoint and replay until T. How often to > > checkpoint depends on the size of the operator's state relative to the > > stream (tables have a lot of state, aggregate has less, and filter and > > project have no state) and the length of its memory (there is little > point > > making a daily checkpoint for a 1 hour windowed aggregate because you can > > restore state by starting with *any* checkpoint and replaying an hour of > > data). > > > > Retention is a contract between the consumer and the up-stream operators. > > If the consumer says to its source operator "I need you to be able to > > replay any time-range from Feb 12th onwards", that operator either needs > to > > store its output back to Feb 12th, or it needs to retain the ability to > > re-create that output. If the latter, then it tells *its* input(s) what > > time-range they need to be able to re-play, say from Feb 11th. For rapid > > play-back, it may choose to keep periodic checkpoints. > > > > If the final consumer loosens its retention requirements, to say 19th Feb > > onwards, then each operator propagates the looser requirements to its > input > > operator(s), and this allows garbage to be collected. > > > > I don't know whether checkpoints and retention are spelled out in > > Kappa/Liquid, but if not, they seem a natural and useful extension to the > > theory. > > > > Julian > > > > > > > On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com> > > wrote: > > > > > > Thanks, Jay. This is one of the really nice advantages of local state > > in my mind. Full retention would work but eventually run out of space, > > right? Ideally, Kafka would guarantee to keep dirty keys for a > > configurable amount of time as Chris suggested. > > > > > > Sent from my iPhone > > > > > >> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote: > > >> > > >> Gotcha. Yes if you want to be able to join to past versions you > > definitely > > >> can't turn on compaction as the whole goal of that feature is to > delete > > >> past versions. But wouldn't it work to use full retention if you want > > that > > >> (and use the MessageChooser interface during reprocessing if you want > > tight > > >> control over the state recreation). I mean you have the same dilemma > if > > you > > >> don't use local state but instead use a remote store--the remote store > > >> likely only keeps the last version of each value so you can't join to > > the > > >> past. > > >> > > >> -Jay > > >> > > >> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com > > > > >> wrote: > > >> > > >>> Jay, > > >>> > > >>> Sorry, I didn't explain it very well. I'm talking about a > stream-table > > >>> join where the table comes from a compacted topic that is used to > > populate > > >>> a local data store. As the stream events are processed, they are > > joined > > >>> with dimension data from the local store. > > >>> > > >>> If you want to kick off another version of this job that starts back > in > > >>> time, the new job cannot reliably recreate the same state of the > local > > >>> store that the original had because old values may have been > compacted > > >>> away. > > >>> > > >>> Does that make sense? > > >>> > > >>> Roger > > >>> > > >>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com> > > wrote: > > >>>> > > >>>> Hey Roger, > > >>>> > > >>>> I'm not sure if I understand the case you are describing. > > >>>> > > >>>> As Chris says we don't yet give you fined grained control over when > > >>> history > > >>>> starts to disappear (though we designed with the intention of making > > that > > >>>> configurable later). However I'm not sure if you need that for the > > case > > >>> you > > >>>> describe. > > >>>> > > >>>> Say you have a job J that takes inputs I1...IN and produces output > > >>> O1...ON > > >>>> and in the process accumulates state in a topic S. I think the > > approach > > >>> is > > >>>> to launch a J' (changed or improved in some way) that reprocesses > > I1...IN > > >>>> from the beginning of time (or some past point) into O1'...ON' and > > >>>> accumulates state in S'. So the state for J and the state for J' are > > >>>> totally independent. J' can't reuse J's state in general because the > > code > > >>>> that generates that state may have changed. > > >>>> > > >>>> -Jay > > >>>> > > >>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover < > roger.hoo...@gmail.com > > > > > >>>> wrote: > > >>>> > > >>>>> Chris + Samza Devs, > > >>>>> > > >>>>> I was wondering whether Samza could support re-processing as > > described > > >>> by > > >>>>> the Kappa architecture or Liquid ( > > >>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf). > > >>>>> > > >>>>> It seems that a changelog is not sufficient to be able to restore > > state > > >>>>> backward in time. Kafka compaction will guarantee that local state > > can > > >>>> be > > >>>>> restored from where it left off but I don't see how it can restore > > past > > >>>>> state. > > >>>>> > > >>>>> Imagine the case where a stream job has a lot of state in it's > local > > >>>> store > > >>>>> but it has not updated any keys in a long time. > > >>>>> > > >>>>> Time t1: All of the data would be in the tail of the Kafka log > (past > > >>> the > > >>>>> cleaner point). > > >>>>> Time t2: The job updates some keys. Now we're in a state where > the > > >>>> next > > >>>>> compaction will blow away the old values for those keys. > > >>>>> Time t3: Compaction occurs and old values are discarded. > > >>>>> > > >>>>> Say we want to launch a re-processing job that would begin from t1. > > If > > >>>> we > > >>>>> launch that job before t3, it will correctly restore it's state. > > >>>> However, > > >>>>> if we launch the job after t3, it will be missing old values, > right? > > >>>>> > > >>>>> Unless I'm misunderstanding something, the only way around this is > to > > >>>> keep > > >>>>> snapshots in addition to the changelog. Has there been any > > discussion > > >>> of > > >>>>> providing an option in Samza of taking RocksDB snapshots and > > persisting > > >>>>> them to an object store or HDFS? > > >>>>> > > >>>>> Thanks, > > >>>>> > > >>>>> Roger > > >>> > > > > > > > >