Good to know, thanks Matthias! You've mentioned a previous operator, but what about: `peek().mapValues().peek()`, will both `peek`s be in the same thread as well?
El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<matth...@confluent.io>) escribió: > Just for clarification: > > `peek()` would run on the same thread and the previous operator. Even > if---strictly speaking---there is no public contract to guarantee this, > it would be the case in the current implementation, and I also don't see > any reason why this would change at any point in the future, because > it's the most efficient implementation I can think of. > > -Matthias > > On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote: > > Thanks, everyone! > > > > @Bill, the main issue with using `KStraem#peek()` is that AFAIK each > `peek` > > processor runs on a potentially different thread, then passing the trace > > between them could be challenging. It will also require users to add > these > > operators themselves, which could be too cumbersome to use. > > > > @Guozhang and @John: I will first focus on creating the > > `TracingProcessorSupplier` for instrumenting custom `Processors` and I > will > > keep the idea of a `ProcessorInterceptor` in the back of my head to see > if > > it make sense to propose a KIP for this. > > > > Thanks again for your feedback! > > > > Cheers, > > Jorge. > > El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>) > > escribió: > > > >> Jorge: > >> > >> I have a crazy idea off the top of my head. > >> > >> Would something as low-tech using KSteam.peek calls on either side of > >> certain processors to record start and end times work? > >> > >> Thanks, > >> Bill > >> > >> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com> > wrote: > >> > >>> Jorge: > >>> > >>> My suggestion was to let your users to implement on the > >>> TracingProcessorSupplier > >>> / TracingProcessor directly instead of the base-line ProcessorSupplier > / > >>> Processor. Would that work for you? > >>> > >>> > >>> Guozhang > >>> > >>> > >>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya < > >>> quilcate.jo...@gmail.com> wrote: > >>> > >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks > >>>> Guozhang and John. > >>>> > >>>> @Guozhang: > >>>> > >>>>> I'd suggest to provide a > >>>>> WrapperProcessorSupplier for the users than modifying > >>>>> InternalStreamsTopology: more specifically, you can provide an > >>>>> `abstract WrapperProcessorSupplier > >>>>> implements ProcessorSupplier` and then let users to instantiate this > >>>> class > >>>>> instead of the "bare-metal" interface. WDYT? > >>>> > >>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`: > >>>> > >>>> ``` > >>>> public class TracingProcessorSupplier<K, V> implements > >>> ProcessorSupplier<K, > >>>> V> { > >>>> final KafkaTracing kafkaTracing; > >>>> final String name; > >>>> final ProcessorSupplier<K, V> delegate; > >>>> public TracingProcessorSupplier(KafkaTracing kafkaTracing, > >>>> String name, ProcessorSupplier<K, V> delegate) { > >>>> this.kafkaTracing = kafkaTracing; > >>>> this.name = name; > >>>> this.delegate = delegate; > >>>> } > >>>> @Override public Processor<K, V> get() { > >>>> return new TracingProcessor<>(kafkaTracing, name, delegate.get()); > >>>> } > >>>> } > >>>> ``` > >>>> > >>>> My challenge is how to wrap Topology Processors created by > >>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by > >>> Kafka > >>>> Streams users. > >>>> > >>>> @John: > >>>> > >>>>> The diff you posted only contains the library-side changes, and it's > >>> not > >>>>> obvious how you would use this to insert the desired tracing code. > >>>>> Perhaps you could provide a snippet demonstrating how you want to use > >>>> this > >>>>> change to enable tracing? > >>>> > >>>> My first approach was something like this: > >>>> > >>>> ``` > >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder(); > >>>> ``` > >>>> > >>>> Where `KafkaStreamsTracing#builder` looks like this: > >>>> > >>>> ``` > >>>> public StreamsBuilder builder() { > >>>> return new StreamsBuilder(new Topology(new > >>>> TracingInternalTopologyBuilder(kafkaTracing))); > >>>> } > >>>> ``` > >>>> > >>>> Then, once the builder creates a topology, `processors` will be > wrapped > >>> by > >>>> `TracingProcessorSupplier` described above. > >>>> > >>>> Probably this approach is too naive but works as an initial proof of > >>>> concept. > >>>> > >>>>> Off the top of my head, here are some other approaches you might > >>>> evaluate: > >>>>> * you mentioned interceptors. Perhaps we could create a > >>>>> ProcessorInterceptor interface and add a config to set it. > >>>> > >>>> This sounds very interesting to me. Then we won't need to touch > >> internal > >>>> API's, and just provide some configs. One challenge here is how to > >> define > >>>> the hooks. In consumer/producer, lifecycle is clear, > >>> `onConsumer`/`onSend` > >>>> and then `onCommit`/`onAck` methods. For Stream processors, how this > >> will > >>>> look like? Maybe `beforeProcess(context, key, value)` and > >>>> `afterProcess(context, key, value)`. > >>>> > >>>>> * perhaps we could simply build the tracing headers into Streams. Is > >>>> there > >>>>> a benefit to making it customizable? > >>>> > >>>> I don't understand this option completely. Do you mean something like > >>>> KIP-159 ( > >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- > >>>> 159%3A+Introducing+Rich+functions+to+Streams > >>>> )? > >>>> Headers available on StreamsDSL will allow users to create "custom" > >>> traces, > >>>> for instance: > >>>> > >>>> ``` > >>>> stream.map( (headers, k, v) -> { > >>>> Span span = kafkaTracing.nextSpan(headers).start(); > >>>> doSomething(k, v); > >>>> span.finish(); > >>>> } > >>>> ``` > >>>> > >>>> but it won't be possible to instrument the existing processors exposed > >> by > >>>> DSL only by enabling headers on Streams DSL. > >>>> > >>>> If we can define a way to pass a `ProcessorSupplier` to be used by > >>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some > >>>> other way- would be enough to support this use-case. > >>>> > >>>>> Also, as Matthias said, you would need to create a KIP to propose > >> this > >>>>> change, but of course we can continue this preliminary discussion > >> until > >>>> you > >>>>> feel confident to create the KIP. > >>>> > >>>> Happy to do it once the approach is clearer. > >>>> > >>>> Cheers, > >>>> Jorge. > >>>> > >>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io > >) > >>>> escribió: > >>>> > >>>>> If I understand the request, it's about tracking the latencies for a > >>>>> specific record, not the aggregated latencies for each processor. > >>>>> > >>>>> Jorge, > >>>>> > >>>>> The diff you posted only contains the library-side changes, and it's > >>> not > >>>>> obvious how you would use this to insert the desired tracing code. > >>>>> Perhaps you could provide a snippet demonstrating how you want to use > >>>> this > >>>>> change to enable tracing? > >>>>> > >>>>> Also, as Matthias said, you would need to create a KIP to propose > >> this > >>>>> change, but of course we can continue this preliminary discussion > >> until > >>>> you > >>>>> feel confident to create the KIP. > >>>>> > >>>>> Off the top of my head, here are some other approaches you might > >>>> evaluate: > >>>>> * you mentioned interceptors. Perhaps we could create a > >>>>> ProcessorInterceptor interface and add a config to set it. > >>>>> * perhaps we could simply build the tracing headers into Streams. Is > >>>> there > >>>>> a benefit to making it customizable? > >>>>> > >>>>> Thanks for considering this problem! > >>>>> -John > >>>>> > >>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> > >>>> wrote: > >>>>> > >>>>>> Hello Jorge, > >>>>>> > >>>>>> From the TracingProcessor implementation it seems you want to track > >>>>>> per-processor processing latency, is that right? If this is the > >> case > >>>> you > >>>>>> can actually use the per-processor metrics which include latency > >>>> sensors. > >>>>>> > >>>>>> If you do want to track, for a certain record, what's the latency > >> of > >>>>>> processing it, then you'd probably need the processor > >> implementation > >>> in > >>>>>> your repo. In this case, though, I'd suggest to provide a > >>>>>> WrapperProcessorSupplier for the users than modifying > >>>>>> InternalStreamsTopology: more specifically, you can provide an > >>>>>> `abstract WrapperProcessorSupplier > >>>>>> implements ProcessorSupplier` and then let users to instantiate > >> this > >>>>> class > >>>>>> instead of the "bare-metal" interface. WDYT? > >>>>>> > >>>>>> > >>>>>> Guozhang > >>>>>> > >>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya < > >>>>>> quilcate.jo...@gmail.com> wrote: > >>>>>> > >>>>>>> Thanks for your answer, Matthias! > >>>>>>> > >>>>>>> What I'm looking for is something similar to interceptors, but > >> for > >>>>> Stream > >>>>>>> Processors. > >>>>>>> > >>>>>>> In Zipkin -and probably other tracing implementations as well- we > >>> are > >>>>>> using > >>>>>>> Headers to propagate the context of a trace (i.e. adding metadata > >>> to > >>>>> the > >>>>>>> Kafka Record, so we can create references to a trace). > >>>>>>> Now that Headers are part of Kafka Streams Processor API, we can > >>>>>> propagate > >>>>>>> context from input (Consumers) to outputs (Producers) by using > >>>>>>> `KafkaClientSupplier` (e.g. < > >>>>>>> https://github.com/openzipkin/brave/blob/master/ > >>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/ > >>>>>>> TracingKafkaClientSupplier.java > >>>>>>>> ). > >>>>>>> > >>>>>>> "Input to Output" traces could be enough for some use-cases, but > >> we > >>>> are > >>>>>>> looking for a more detailed trace -that could cover cases like > >>>>>> side-effects > >>>>>>> (e.g. for each processor), where input/output and processors > >>>> latencies > >>>>>> can > >>>>>>> be recorded. This is why I have been looking for how to decorate > >>> the > >>>>>>> `ProcessorSupplier` and all the changes shown in the comparison. > >>> Here > >>>>> is > >>>>>> a > >>>>>>> gist of how we are planning to decorate the `addProcessor` > >> method: > >>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo: > >>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7 > >>>>>>> > >>>>>>> Hope this makes a bit more sense now :) > >>>>>>> > >>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (< > >>>>>>> matth...@confluent.io>) > >>>>>>> escribió: > >>>>>>> > >>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. > >>>>>>>> > >>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine > >>>> removing > >>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's > >> an > >>>>>>>> internal class. > >>>>>>>> > >>>>>>>> However, the diff also shows > >>>>>>>> > >>>>>>>>> public Topology(final InternalTopologyBuilder > >>>>>> internalTopologyBuilder) > >>>>>>> { > >>>>>>>> > >>>>>>>> This has two impacts: first, it modifies `Topology` what is > >> part > >>> of > >>>>>>>> public API and would require a KIP. Second, it exposes > >>>>>>>> `InternalTopologyBuilder` as part of the public API -- > >> something > >>> we > >>>>>>>> should not do. > >>>>>>>> > >>>>>>>> I am also not sure, why you want to do this (btw: also public > >> API > >>>>>> change > >>>>>>>> requiring a KIP). However, this should not be necessary. > >>>>>>>> > >>>>>>>>> public StreamsBuilder(final Topology topology) { > >>>>>>>> > >>>>>>>> > >>>>>>>> I think I am lacking some context what you try to achieve. > >> Maybe > >>>> you > >>>>>> can > >>>>>>>> elaborate in the problem you try to solve? > >>>>>>>> > >>>>>>>> > >>>>>>>> -Matthias > >>>>>>>> > >>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote: > >>>>>>>>> Hi everyone, > >>>>>>>>> > >>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams. > >>>>>>>>> > >>>>>>>>> One option is to override and access > >>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method > >>>> it is > >>>>>>>> final, > >>>>>>>>> and builder is not exposed as part of `StreamsBuilder`: > >>>>>>>>> > >>>>>>>>> ``` > >>>>>>>>> public class StreamsBuilder { > >>>>>>>>> > >>>>>>>>> /** The actual topology that is constructed by this > >>>>>> StreamsBuilder. > >>>>>>>> */ > >>>>>>>>> private final Topology topology = new Topology(); > >>>>>>>>> > >>>>>>>>> /** The topology's internal builder. */ > >>>>>>>>> final InternalTopologyBuilder internalTopologyBuilder = > >>>>>>>>> topology.internalTopologyBuilder; > >>>>>>>>> > >>>>>>>>> private final InternalStreamsBuilder > >>> internalStreamsBuilder = > >>>>> new > >>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder); > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we > >> could > >>>>>>> decorate > >>>>>>>>> every `ProcessorSupplier` and capture traces from it: > >>>>>>>>> > >>>>>>>>> ``` > >>>>>>>>> @Override > >>>>>>>>> public void addProcessor(String name, ProcessorSupplier > >>>> supplier, > >>>>>>>>> String... predecessorNames) { > >>>>>>>>> super.addProcessor(name, new TracingProcessorSupplier( > >>>> tracer, > >>>>>>> name, > >>>>>>>>> supplier), predecessorNames); > >>>>>>>>> } > >>>>>>>>> ``` > >>>>>>>>> > >>>>>>>>> Would it make sense to propose this as a change: > >>>>>>>>> > >>>>>> > >>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology > >>>>>>> ? > >>>>>>>> or > >>>>>>>>> maybe there is a better way to do this? > >>>>>>>>> TopologyWrapper does something similar: > >>>>>>>>> > >>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/ > >>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java > >>>>>>>>> > >>>>>>>>> Thanks in advance for any help. > >>>>>>>>> > >>>>>>>>> Cheers, > >>>>>>>>> Jorge. > >>>>>>>>> > >>>>>>>> > >>>>>>>> > >>>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> -- > >>>>>> -- Guozhang > >>>>>> > >>>>> > >>>> > >>> > >>> > >>> > >>> -- > >>> -- Guozhang > >>> > >> > > > >