I agree that the metadata topic is required to build a batching semantic that is intuitive.
One question on the config - autostop.at I see one value for it - eol. What other values can be used? Instead, why would we not have batch.mode=true/false? On Wed, Nov 30, 2016 at 1:51 PM, Matthias J. Sax <matth...@confluent.io> wrote: > Both types of intermediate topics are handled the exact same way and > both types do connect different subtopologies (even if the user might > not be aware that there are multiple subtopologies in case of internal > data repartitioning). So there is no distinction between user > intermediate topics (via through()) and internal intermediate > repartitioning topics. > > I do also not understand your argument about "coupling instances"? The > only "synchronization" is at startup time until the marker is written. > Afterwards all instances just run as always. Furthermore, the metadata > topic will be written within the leader while computing the overall > partition assignment. Thus, the metadata topic will be fully populated > (including the marker) before the individual instance will receive their > assignment via group management protocol. So there is not more > "synchronization" than before, as group management does synchronize > instances anyway at startup. > > About startup failure. Yes, there is the case that the leader could > potentially fail before the marker gets written. For this case, we have > to consider a few things: > > 1. the net effect is, that no data will be processed by any instance > (so application can start up, because no partition assignment will be > distributed via group management, as the leader did fail while computing > the assignment) > > 2. the failure would occur on partition assignment what would be a > severe failure anyway and the application has bigger problems than a > missing marker in the meta data topic (nobody will get partitioned > assigned as the leader did not finish the assignment computation) > > 3. if the leader fails, a different application will become the leader. > a) thus, if it is a permanent problem, eventually all instances are > going down > b) if the problem is transient, the probability is very high that the > new leader will not fail > > > > -Matthias > > On 11/30/16 1:21 PM, Eno Thereska wrote: > > In the KIP, two types of intermediate topics are described, 1) ones that > connect two sub-topologies, and 2) others that are internal repartitioning > topics (e.g., for joins). > > I wasn't envisioning stopping the consumption of (2) at the HWM. The HWM > can be used for the source topics only (so I agree with your "joins" > scenario, but for a different reason). > > > > The case I am worried about is (1) when there are implicit connections > between application instances where a 2nd instance's source topics would be > the 1st instances output topics. In that case I was suggesting not to > couple those instances. > > > > In the (corner) case when the application fails repeatedly, it can still > fail right before we write to the metadata topic, so that corner case can > still happen. However, it should be extremely rare, and I'd argue if the > app is failing repeatedly N times there are bigger problems with the app. > > > > Eno > > > >> On 30 Nov 2016, at 11:52, Damian Guy <damian....@gmail.com> wrote: > >> > >> I think the KIP looks good. I also think we need the metadata topic > >> in-order to provide sane guarantees on what data will be processed. > >> > >> As Matthias has outlined in the KIP we need to know when to stop > consuming > >> from intermediate topics, i.e, topics that are part of the same > application > >> but are used for re-partitioning or through etc. Without the metadata > topic > >> the consumption from the intermediate topics would always be one run > >> behind. In the case of a join requiring partitioning this would result > in > >> no output for the first run and then in subsequent runs you'd get the > >> output from the previous run - i'd find this a bit odd. > >> > >> Also I think having a fixed HWM IMO is a good idea. If you are running > your > >> streams app in some shared environment, then you don't want to get into > a > >> situation where the app fails (for random reasons), restarts with a new > >> HMW, fails, restarts... and then continues to consume resources for > ever as > >> the HMW is constantly moving forward. So i think the approach in the KIP > >> helps batch-mode streams apps to be good-citizens when running in shared > >> environments. > >> > >> Thanks, > >> Damian > >> > >> On Wed, 30 Nov 2016 at 10:40 Eno Thereska <eno.there...@gmail.com> > wrote: > >> > >>> Hi Matthias, > >>> > >>> I like the first part of the KIP. However, the second part with the > >>> failure modes and metadata topic is quite complex and I'm worried it > >>> doesn't solve the problems you mention under failure. For example, the > >>> application can fail before writing to the metadata topic. In that > case, it > >>> is not clear what the second app instance should do (for the handling > of > >>> intermediate topics case). So in general, we have the problem of > failures > >>> during writes to the metadata topic itself. > >>> > >>> Also, for the intermediate topics example, I feel like we are trying to > >>> provide some sort of synchronisation between app instances with this > >>> approach. By default today such synchronisation does not exist. One > >>> instances writes to the intermediate topic, and the other reads from > it, > >>> but only eventually. That is a nice way to decouple instances in my > opinion. > >>> > >>> The user can always run the batch processing multiple times and > eventually > >>> all instances will produce some output. The user's app can check > whether > >>> the output size is satisfactory and then not run any further loops. So > I > >>> feel they can already get a lot with the simpler first part of the KIP. > >>> > >>> Thanks > >>> Eno > >>> > >>> > >>>> On 30 Nov 2016, at 05:45, Matthias J. Sax <matth...@confluent.io> > wrote: > >>>> > >>>> Thanks for your input. > >>>> > >>>> To clarify: the main reason to add the metadata topic is to cope with > >>>> subtopologies that are connected via intermediate topic (either > >>>> user-defined via through() or internally created for data > >>> repartitioning). > >>>> > >>>> Without this handling, the behavior would be odd and user experience > >>>> would be bad. > >>>> > >>>> Thus, using the metadata topic for have a "fixed HW" is just a small > >>>> add-on -- and more or less for free, because the metadata topic is > >>>> already there. > >>>> > >>>> > >>>> -Matthias > >>>> > >>>> > >>>> On 11/29/16 7:53 PM, Neha Narkhede wrote: > >>>>> Thanks for initiating this. I think this is a good first step towards > >>>>> unifying batch and stream processing in Kafka. > >>>>> > >>>>> I understood this capability to be simple yet very useful; it allows > a > >>>>> Streams program to process a log, in batch, in arbitrary windows > >>> defined by > >>>>> the difference between the HW and the current offset. Basically, it > >>>>> provides a simple means for a Streams program to "stop" after > >>> processing a > >>>>> batch, stop (just like a batch program would) and continue where it > left > >>>>> off when restarted. In other words, it allows batch processing > behavior > >>> for > >>>>> a Streams app without code changes. > >>>>> > >>>>> This feature is useful but I do not think there is a necessity to > add a > >>>>> metadata topic. After all, the user doesn't really care as much about > >>>>> exactly where the batch ends. This feature allows an app to "process > as > >>>>> much as there is data to process" and the way it determines how much > >>> data > >>>>> there is to process is by reading the HW on startup. If there is new > >>> data > >>>>> written to the log right after it starts up, it will process it when > >>>>> restarted the next time. If it starts, reads HW but fails, it will > >>> restart > >>>>> and process a little more before it stops again. The fact that the HW > >>>>> changes in some scenarios isn't an issue since a batch program that > >>> behaves > >>>>> this way doesn't really care exactly what that HW is. > >>>>> > >>>>> There might be cases which require adding more topics but I would shy > >>> away > >>>>> from adding complexity wherever possible as it complicates operations > >>> and > >>>>> reduces simplicity. > >>>>> > >>>>> Other than this issue, I'm +1 on adding this feature. I think it is > >>> pretty > >>>>> powerful. > >>>>> > >>>>> > >>>>> On Mon, Nov 28, 2016 at 10:48 AM Matthias J. Sax < > matth...@confluent.io > >>>> > >>>>> wrote: > >>>>> > >>>>>> Hi all, > >>>>>> > >>>>>> I want to start a discussion about KIP-95: > >>>>>> > >>>>>> > >>>>>> > >>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > 95%3A+Incremental+Batch+Processing+for+Kafka+Streams > >>>>>> > >>>>>> Looking forward to your feedback. > >>>>>> > >>>>>> > >>>>>> -Matthias > >>>>>> > >>>>>> > >>>>>> -- > >>>>> Thanks, > >>>>> Neha > >>>>> > >>>> > >>> > >>> > > > >