I will read through the KIP doc once again to provide more detailed
feedbacks, but let me through my two cents just for the above email.

There are a few motivations to have a "consistent" stop-point across tasks
belonging to different sub-topologies. One of them is for interactive
queries: say you have two state stores belonging to two sub-topologies, if
they stopped at different points, then when user querying them they will
see inconsistent answers (think about the example people always use in
databases: the stores represent A and B's bank account and a record is
processed to move X dollar from A to B).

As for the implementation to support such consistent stop-points though, I
think the metadata field in offset topic does worth exploring, because
Streams may very likely use the transactional APIs proposed in KIP-98 to
let producers send offsets in a transactional manner, not the consumers
themselves (details can be found here
https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8
).



Guozhang


On Tue, Dec 6, 2016 at 2:45 PM, Matthias J. Sax <matth...@confluent.io>
wrote:

> Thanks for the input Jay.
>
> From my understanding, your question boils down to how fuzzy the stop
> point can/should be, and what guarantees we want to provide to the user.
> This has multiple dimension:
>
>
> 1. Using a timestamp has the main issue, that if an input topic has no
> data with this timestamp, the application does never finish (ie, stop
> timestamp is in "the future").
>
> Furthermore, it would require the user to specify the timestamp because
> if we use current (ie, on startup) system time as stop-timestamp, the
> case of a "future stop-timestamp" might be very common (think no active
> producers for a topic -- it's batch processing). Thus, the user needs to
> know the stop-timestamp, which might be hard for her to figure out -- in
> the current design it's way simpler for the user to activate "auto stop".
>
> Last but not least, assume an application with two subtopologies that
> are connected via an intermediate topic and both subtopologies are
> executed in different JVMs. The first topology could filter a lot of
> messages and thus it might happen, that it never writes a record with
> timestamp >= stop-timestamp into the intermediate topic. Thus, even if
> the first JVM terminates the second would not stop automatically as it
> never reads a record with timestamp >= stop-timestamp.
>
> There would be some workaround if we shut down in a "fuzzy way", ie,
> with no guarantees what record will actually get processed (ie, stop
> processing somewhat early of some cases). But I will argue in (3) why
> this "stop roughly about this time semantic" is not a good idea.
>
>
> 2. I was not aware of a metadata field for committed offsets and this
> sounds quite appealing. However, thinking about it in more detail, I
> have my doubts we can use it:
>
> If we want to propagate stop-offsets for intermediate topics, all
> producer instances would need to update this metadata field, thus need
> to commit (A producer that does commit? Well we could use "restore
> consumer" with manual partition assignment for this.) -- however, this
> would not only conflict with the commits of the actual consumer, but
> also in between all running producers.
>
>
> 3. This is the overall "how fuzzy we want to be" discussion. I would
> argue that we should provide somewhat strong stop consistency. Assume
> the following use case. An external application generates data in
> batches and writes files to HDFS. Those files are imported into Kafka
> via Connect. Each time a batch of data gots inserted into Kafka, this
> data should be processed with a Streams application. If we cannot
> guarantee that all data of this batch is fully processed and the result
> is complete, use experience would be quite bad.
>
> Furthermore, we want to guard a running batch job to process "too much"
> data: Assume the same scenario as before with HDFS + Connect. For
> whatever reason, a Streams batch job takes longer than usual, and while
> it is running new data is appended to the topic as new files (of the
> next batch) are already available. We don't want to process this data
> with the current running app, but want it to be included in the next
> batch run (you could imagine, that each batch job will write the result
> into a different output topic). Thus, we want to have all data processed
> completely, ie, provide strong start-stop consistency.
>
>
> Sorry for the quite long answer. Hope it is convincing though :)
>
>
> -Matthias
>
>
> On 12/5/16 4:44 PM, Jay Kreps wrote:
> > I'd like to second the discouragement of adding a new topic per job. We
> > went down this path in Samza and I think the result was quite a mess. You
> > had to read the full topic every time a job started and so it added a lot
> > of overhead and polluted the topic space.
> >
> > What if we did the following:
> >
> >    1. Use timestamp instead of offset
> >    2. Store the "stopping timestamp" in the metadata field associated
> with
> >    the existing offset storage mechanism
> >    3. Don't worry about fully processing the entire DAG. After all,
> >    partially processing a tuple isn't much different from not processing
> it,
> >    and in any case the stopping point is a heuristic so no point in being
> >    overly precise here.
> >
> > Probably I'm missing something, though, I haven't thought through the
> > implications of using time instead of offset.
> >
> > -Jay
> >
> > On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io
> >
> > wrote:
> >
> >> Hi all,
> >>
> >> I want to start a discussion about KIP-95:
> >>
> >> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
> >>
> >> Looking forward to your feedback.
> >>
> >>
> >> -Matthias
> >>
> >>
> >>
> >
>
>


-- 
-- Guozhang

Reply via email to