About using offset-topic metadata field:

Even with KIP-98, I think this would not work (or maybe in a weird way).
If we have the following topology:

topic1 -> subTopologyA -> topic2 -> subTopologyB

If producer of subTopologyA commits, it will commit its input offsets
from topic1. Thus, the stop-offsets of topic2 for subTopologyB would we
committed with the metadata of topic1 commits. But subTopologyB is not
really related to topic1. I guess it would not be impossible to make
this work, however, the design seems to be somewhat weird.

But maybe, I do miss something.

Furthermore, I am not sure if Streams will use transactions all the
same. Will there be an option for the user to disable transactions and
stick with at-least-once processing?

Also, we would need to delay this KIP until KIP-98 and a Streams EOS KIP
is in place... I would rather include this in next release 0.10.2.


-Matthias

On 12/9/16 10:47 AM, Guozhang Wang wrote:
> I will read through the KIP doc once again to provide more detailed
> feedbacks, but let me through my two cents just for the above email.
> 
> There are a few motivations to have a "consistent" stop-point across tasks
> belonging to different sub-topologies. One of them is for interactive
> queries: say you have two state stores belonging to two sub-topologies, if
> they stopped at different points, then when user querying them they will
> see inconsistent answers (think about the example people always use in
> databases: the stores represent A and B's bank account and a record is
> processed to move X dollar from A to B).
> 
> As for the implementation to support such consistent stop-points though, I
> think the metadata field in offset topic does worth exploring, because
> Streams may very likely use the transactional APIs proposed in KIP-98 to
> let producers send offsets in a transactional manner, not the consumers
> themselves (details can be found here
> https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8
> ).
> 
> 
> 
> Guozhang
> 
> 
> On Tue, Dec 6, 2016 at 2:45 PM, Matthias J. Sax <matth...@confluent.io>
> wrote:
> 
>> Thanks for the input Jay.
>>
>> From my understanding, your question boils down to how fuzzy the stop
>> point can/should be, and what guarantees we want to provide to the user.
>> This has multiple dimension:
>>
>>
>> 1. Using a timestamp has the main issue, that if an input topic has no
>> data with this timestamp, the application does never finish (ie, stop
>> timestamp is in "the future").
>>
>> Furthermore, it would require the user to specify the timestamp because
>> if we use current (ie, on startup) system time as stop-timestamp, the
>> case of a "future stop-timestamp" might be very common (think no active
>> producers for a topic -- it's batch processing). Thus, the user needs to
>> know the stop-timestamp, which might be hard for her to figure out -- in
>> the current design it's way simpler for the user to activate "auto stop".
>>
>> Last but not least, assume an application with two subtopologies that
>> are connected via an intermediate topic and both subtopologies are
>> executed in different JVMs. The first topology could filter a lot of
>> messages and thus it might happen, that it never writes a record with
>> timestamp >= stop-timestamp into the intermediate topic. Thus, even if
>> the first JVM terminates the second would not stop automatically as it
>> never reads a record with timestamp >= stop-timestamp.
>>
>> There would be some workaround if we shut down in a "fuzzy way", ie,
>> with no guarantees what record will actually get processed (ie, stop
>> processing somewhat early of some cases). But I will argue in (3) why
>> this "stop roughly about this time semantic" is not a good idea.
>>
>>
>> 2. I was not aware of a metadata field for committed offsets and this
>> sounds quite appealing. However, thinking about it in more detail, I
>> have my doubts we can use it:
>>
>> If we want to propagate stop-offsets for intermediate topics, all
>> producer instances would need to update this metadata field, thus need
>> to commit (A producer that does commit? Well we could use "restore
>> consumer" with manual partition assignment for this.) -- however, this
>> would not only conflict with the commits of the actual consumer, but
>> also in between all running producers.
>>
>>
>> 3. This is the overall "how fuzzy we want to be" discussion. I would
>> argue that we should provide somewhat strong stop consistency. Assume
>> the following use case. An external application generates data in
>> batches and writes files to HDFS. Those files are imported into Kafka
>> via Connect. Each time a batch of data gots inserted into Kafka, this
>> data should be processed with a Streams application. If we cannot
>> guarantee that all data of this batch is fully processed and the result
>> is complete, use experience would be quite bad.
>>
>> Furthermore, we want to guard a running batch job to process "too much"
>> data: Assume the same scenario as before with HDFS + Connect. For
>> whatever reason, a Streams batch job takes longer than usual, and while
>> it is running new data is appended to the topic as new files (of the
>> next batch) are already available. We don't want to process this data
>> with the current running app, but want it to be included in the next
>> batch run (you could imagine, that each batch job will write the result
>> into a different output topic). Thus, we want to have all data processed
>> completely, ie, provide strong start-stop consistency.
>>
>>
>> Sorry for the quite long answer. Hope it is convincing though :)
>>
>>
>> -Matthias
>>
>>
>> On 12/5/16 4:44 PM, Jay Kreps wrote:
>>> I'd like to second the discouragement of adding a new topic per job. We
>>> went down this path in Samza and I think the result was quite a mess. You
>>> had to read the full topic every time a job started and so it added a lot
>>> of overhead and polluted the topic space.
>>>
>>> What if we did the following:
>>>
>>>    1. Use timestamp instead of offset
>>>    2. Store the "stopping timestamp" in the metadata field associated
>> with
>>>    the existing offset storage mechanism
>>>    3. Don't worry about fully processing the entire DAG. After all,
>>>    partially processing a tuple isn't much different from not processing
>> it,
>>>    and in any case the stopping point is a heuristic so no point in being
>>>    overly precise here.
>>>
>>> Probably I'm missing something, though, I haven't thought through the
>>> implications of using time instead of offset.
>>>
>>> -Jay
>>>
>>> On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io
>>>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I want to start a discussion about KIP-95:
>>>>
>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
>>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams
>>>>
>>>> Looking forward to your feedback.
>>>>
>>>>
>>>> -Matthias
>>>>
>>>>
>>>>
>>>
>>
>>
> 
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to