About using offset-topic metadata field: Even with KIP-98, I think this would not work (or maybe in a weird way). If we have the following topology:
topic1 -> subTopologyA -> topic2 -> subTopologyB If producer of subTopologyA commits, it will commit its input offsets from topic1. Thus, the stop-offsets of topic2 for subTopologyB would we committed with the metadata of topic1 commits. But subTopologyB is not really related to topic1. I guess it would not be impossible to make this work, however, the design seems to be somewhat weird. But maybe, I do miss something. Furthermore, I am not sure if Streams will use transactions all the same. Will there be an option for the user to disable transactions and stick with at-least-once processing? Also, we would need to delay this KIP until KIP-98 and a Streams EOS KIP is in place... I would rather include this in next release 0.10.2. -Matthias On 12/9/16 10:47 AM, Guozhang Wang wrote: > I will read through the KIP doc once again to provide more detailed > feedbacks, but let me through my two cents just for the above email. > > There are a few motivations to have a "consistent" stop-point across tasks > belonging to different sub-topologies. One of them is for interactive > queries: say you have two state stores belonging to two sub-topologies, if > they stopped at different points, then when user querying them they will > see inconsistent answers (think about the example people always use in > databases: the stores represent A and B's bank account and a record is > processed to move X dollar from A to B). > > As for the implementation to support such consistent stop-points though, I > think the metadata field in offset topic does worth exploring, because > Streams may very likely use the transactional APIs proposed in KIP-98 to > let producers send offsets in a transactional manner, not the consumers > themselves (details can be found here > https://docs.google.com/document/d/11Jqy_GjUGtdXJK94XGsEIK7CP1SnQGdp2eF0wSw9ra8 > ). > > > > Guozhang > > > On Tue, Dec 6, 2016 at 2:45 PM, Matthias J. Sax <matth...@confluent.io> > wrote: > >> Thanks for the input Jay. >> >> From my understanding, your question boils down to how fuzzy the stop >> point can/should be, and what guarantees we want to provide to the user. >> This has multiple dimension: >> >> >> 1. Using a timestamp has the main issue, that if an input topic has no >> data with this timestamp, the application does never finish (ie, stop >> timestamp is in "the future"). >> >> Furthermore, it would require the user to specify the timestamp because >> if we use current (ie, on startup) system time as stop-timestamp, the >> case of a "future stop-timestamp" might be very common (think no active >> producers for a topic -- it's batch processing). Thus, the user needs to >> know the stop-timestamp, which might be hard for her to figure out -- in >> the current design it's way simpler for the user to activate "auto stop". >> >> Last but not least, assume an application with two subtopologies that >> are connected via an intermediate topic and both subtopologies are >> executed in different JVMs. The first topology could filter a lot of >> messages and thus it might happen, that it never writes a record with >> timestamp >= stop-timestamp into the intermediate topic. Thus, even if >> the first JVM terminates the second would not stop automatically as it >> never reads a record with timestamp >= stop-timestamp. >> >> There would be some workaround if we shut down in a "fuzzy way", ie, >> with no guarantees what record will actually get processed (ie, stop >> processing somewhat early of some cases). But I will argue in (3) why >> this "stop roughly about this time semantic" is not a good idea. >> >> >> 2. I was not aware of a metadata field for committed offsets and this >> sounds quite appealing. However, thinking about it in more detail, I >> have my doubts we can use it: >> >> If we want to propagate stop-offsets for intermediate topics, all >> producer instances would need to update this metadata field, thus need >> to commit (A producer that does commit? Well we could use "restore >> consumer" with manual partition assignment for this.) -- however, this >> would not only conflict with the commits of the actual consumer, but >> also in between all running producers. >> >> >> 3. This is the overall "how fuzzy we want to be" discussion. I would >> argue that we should provide somewhat strong stop consistency. Assume >> the following use case. An external application generates data in >> batches and writes files to HDFS. Those files are imported into Kafka >> via Connect. Each time a batch of data gots inserted into Kafka, this >> data should be processed with a Streams application. If we cannot >> guarantee that all data of this batch is fully processed and the result >> is complete, use experience would be quite bad. >> >> Furthermore, we want to guard a running batch job to process "too much" >> data: Assume the same scenario as before with HDFS + Connect. For >> whatever reason, a Streams batch job takes longer than usual, and while >> it is running new data is appended to the topic as new files (of the >> next batch) are already available. We don't want to process this data >> with the current running app, but want it to be included in the next >> batch run (you could imagine, that each batch job will write the result >> into a different output topic). Thus, we want to have all data processed >> completely, ie, provide strong start-stop consistency. >> >> >> Sorry for the quite long answer. Hope it is convincing though :) >> >> >> -Matthias >> >> >> On 12/5/16 4:44 PM, Jay Kreps wrote: >>> I'd like to second the discouragement of adding a new topic per job. We >>> went down this path in Samza and I think the result was quite a mess. You >>> had to read the full topic every time a job started and so it added a lot >>> of overhead and polluted the topic space. >>> >>> What if we did the following: >>> >>> 1. Use timestamp instead of offset >>> 2. Store the "stopping timestamp" in the metadata field associated >> with >>> the existing offset storage mechanism >>> 3. Don't worry about fully processing the entire DAG. After all, >>> partially processing a tuple isn't much different from not processing >> it, >>> and in any case the stopping point is a heuristic so no point in being >>> overly precise here. >>> >>> Probably I'm missing something, though, I haven't thought through the >>> implications of using time instead of offset. >>> >>> -Jay >>> >>> On Mon, Nov 28, 2016 at 10:47 AM, Matthias J. Sax <matth...@confluent.io >>> >>> wrote: >>> >>>> Hi all, >>>> >>>> I want to start a discussion about KIP-95: >>>> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 95%3A+Incremental+Batch+Processing+for+Kafka+Streams >>>> >>>> Looking forward to your feedback. >>>> >>>> >>>> -Matthias >>>> >>>> >>>> >>> >> >> > >
signature.asc
Description: OpenPGP digital signature