Can I quibble with semantics? 

This problem seems to be more naturally a stream-to-stream join, not a 
stream-to-table join. It seems unreasonable to expect the system to be able to 
give you the state of a table at a given moment in the past, but it is 
reasonable ask for the stream up to that point.

A stream and the archive of a table (its contents at every moment in the past) 
are equivalent in theory (they have exactly the same information content) but 
different in practice: (1) there are different costs to access them (it is 
costly to re-create a table by re-playing a stream of its inserts), and (2) 
streams are managed internal to the system whereas tables are external. For 
Roger's problem, (2) is a crucial difference.

Then the question is how to throw information away but make it possible, and 
efficient, to answer the queries we will need to ask in future.

A good way to do this is with checkpoints, replay, and retention. You 
periodically checkpoint the state of a table (or indeed any stateful stream 
operator). To re-create the state of a operator at a particular time T you 
start with the previous checkpoint and replay until T. How often to checkpoint 
depends on the size of the operator's state relative to the stream (tables have 
a lot of state, aggregate has less, and filter and project have no state) and 
the length of its memory (there is little point making a daily checkpoint for a 
1 hour windowed aggregate because you can restore state by starting with *any* 
checkpoint and replaying an hour of data).

Retention is a contract between the consumer and the up-stream operators. If 
the consumer says to its source operator "I need you to be able to replay any 
time-range from Feb 12th onwards", that operator either needs to store its 
output back to Feb 12th, or it needs to retain the ability to re-create that 
output. If the latter, then it tells *its* input(s) what time-range they need 
to be able to re-play, say from Feb 11th. For rapid play-back, it may choose to 
keep periodic checkpoints.

If the final consumer loosens its retention requirements, to say 19th Feb 
onwards, then each operator propagates the looser requirements to its input 
operator(s), and this allows garbage to be collected.

I don't know whether checkpoints and retention are spelled out in Kappa/Liquid, 
but if not, they seem a natural and useful extension to the theory.

Julian


> On Feb 21, 2015, at 4:51 PM, Roger Hoover <roger.hoo...@gmail.com> wrote:
> 
> Thanks, Jay.  This is one of the really nice advantages of local state in my 
> mind.  Full retention would work but eventually run out of space, right?  
> Ideally, Kafka would guarantee to keep dirty keys for a configurable amount 
> of time as Chris suggested.
> 
> Sent from my iPhone
> 
>> On Feb 21, 2015, at 10:10 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>> 
>> Gotcha. Yes if you want to be able to join to past versions you definitely
>> can't turn on compaction as the whole goal of that feature is to delete
>> past versions. But wouldn't it work to use full retention if you want that
>> (and use the MessageChooser interface during reprocessing if you want tight
>> control over the state recreation). I mean you have the same dilemma if you
>> don't use local state but instead use a remote store--the remote store
>> likely only keeps the last version of each value so you can't join to the
>> past.
>> 
>> -Jay
>> 
>> On Fri, Feb 20, 2015 at 9:04 PM, Roger Hoover <roger.hoo...@gmail.com>
>> wrote:
>> 
>>> Jay,
>>> 
>>> Sorry, I didn't explain it very well.  I'm talking about a stream-table
>>> join where the table comes from a compacted topic that is used to populate
>>> a local data store.  As the stream events are processed, they are joined
>>> with dimension data from the local store.
>>> 
>>> If you want to kick off another version of this job that starts back in
>>> time, the new job cannot reliably recreate the same state of the local
>>> store that the original had because old values may have been compacted
>>> away.
>>> 
>>> Does that make sense?
>>> 
>>> Roger
>>> 
>>>> On Fri, Feb 20, 2015 at 2:52 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>>>> 
>>>> Hey Roger,
>>>> 
>>>> I'm not sure if I understand the case you are describing.
>>>> 
>>>> As Chris says we don't yet give you fined grained control over when
>>> history
>>>> starts to disappear (though we designed with the intention of making that
>>>> configurable later). However I'm not sure if you need that for the case
>>> you
>>>> describe.
>>>> 
>>>> Say you have a job J that takes inputs I1...IN and produces output
>>> O1...ON
>>>> and in the process accumulates state in a topic S. I think the approach
>>> is
>>>> to launch a J' (changed or improved in some way) that reprocesses I1...IN
>>>> from the beginning of time (or some past point) into O1'...ON' and
>>>> accumulates state in S'. So the state for J and the state for J' are
>>>> totally independent. J' can't reuse J's state in general because the code
>>>> that generates that state may have changed.
>>>> 
>>>> -Jay
>>>> 
>>>> On Thu, Feb 19, 2015 at 9:30 AM, Roger Hoover <roger.hoo...@gmail.com>
>>>> wrote:
>>>> 
>>>>> Chris + Samza Devs,
>>>>> 
>>>>> I was wondering whether Samza could support re-processing as described
>>> by
>>>>> the Kappa architecture or Liquid (
>>>>> http://www.cidrdb.org/cidr2015/Papers/CIDR15_Paper25u.pdf).
>>>>> 
>>>>> It seems that a changelog is not sufficient to be able to restore state
>>>>> backward in time.  Kafka compaction will guarantee that local state can
>>>> be
>>>>> restored from where it left off but I don't see how it can restore past
>>>>> state.
>>>>> 
>>>>> Imagine the case where a stream job has a lot of state in it's local
>>>> store
>>>>> but it has not updated any keys in a long time.
>>>>> 
>>>>> Time t1: All of the data would be in the tail of the Kafka log (past
>>> the
>>>>> cleaner point).
>>>>> Time t2:  The job updates some keys.   Now we're in a state where the
>>>> next
>>>>> compaction will blow away the old values for those keys.
>>>>> Time t3:  Compaction occurs and old values are discarded.
>>>>> 
>>>>> Say we want to launch a re-processing job that would begin from t1.  If
>>>> we
>>>>> launch that job before t3, it will correctly restore it's state.
>>>> However,
>>>>> if we launch the job after t3, it will be missing old values, right?
>>>>> 
>>>>> Unless I'm misunderstanding something, the only way around this is to
>>>> keep
>>>>> snapshots in addition to the changelog.  Has there been any discussion
>>> of
>>>>> providing an option in Samza of taking RocksDB snapshots and persisting
>>>>> them to an object store or HDFS?
>>>>> 
>>>>> Thanks,
>>>>> 
>>>>> Roger
>>> 

Reply via email to