Good to know, thanks Matthias!

You've mentioned a previous operator, but what about:
`peek().mapValues().peek()`, will both `peek`s be in the same thread as
well?

El mar., 25 sept. 2018 a las 23:14, Matthias J. Sax (<matth...@confluent.io>)
escribió:

> Just for clarification:
>
> `peek()` would run on the same thread and the previous operator. Even
> if---strictly speaking---there is no public contract to guarantee this,
> it would be the case in the current implementation, and I also don't see
> any reason why this would change at any point in the future, because
> it's the most efficient implementation I can think of.
>
> -Matthias
>
> On 9/22/18 4:51 AM, Jorge Esteban Quilcate Otoya wrote:
> > Thanks, everyone!
> >
> > @Bill, the main issue with using `KStraem#peek()` is that AFAIK each
> `peek`
> > processor runs on a potentially different thread, then passing the trace
> > between them could be challenging. It will also require users to add
> these
> > operators themselves, which could be too cumbersome to use.
> >
> > @Guozhang and @John: I will first focus on creating the
> > `TracingProcessorSupplier` for instrumenting custom `Processors` and I
> will
> > keep the idea of a `ProcessorInterceptor` in the back of my head to see
> if
> > it make sense to propose a KIP for this.
> >
> > Thanks again for your feedback!
> >
> > Cheers,
> > Jorge.
> > El mié., 19 sept. 2018 a las 1:55, Bill Bejeck (<bbej...@gmail.com>)
> > escribió:
> >
> >> Jorge:
> >>
> >> I have a crazy idea off the top of my head.
> >>
> >> Would something as low-tech using KSteam.peek calls on either side of
> >> certain processors to record start and end times work?
> >>
> >> Thanks,
> >> Bill
> >>
> >> On Tue, Sep 18, 2018 at 4:38 PM Guozhang Wang <wangg...@gmail.com>
> wrote:
> >>
> >>> Jorge:
> >>>
> >>> My suggestion was to let your users to implement on the
> >>> TracingProcessorSupplier
> >>> / TracingProcessor directly instead of the base-line ProcessorSupplier
> /
> >>> Processor. Would that work for you?
> >>>
> >>>
> >>> Guozhang
> >>>
> >>>
> >>> On Tue, Sep 18, 2018 at 8:02 AM, Jorge Esteban Quilcate Otoya <
> >>> quilcate.jo...@gmail.com> wrote:
> >>>
> >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();Thanks
> >>>> Guozhang and John.
> >>>>
> >>>> @Guozhang:
> >>>>
> >>>>> I'd suggest to provide a
> >>>>> WrapperProcessorSupplier for the users than modifying
> >>>>> InternalStreamsTopology: more specifically, you can provide an
> >>>>> `abstract WrapperProcessorSupplier
> >>>>> implements ProcessorSupplier` and then let users to instantiate this
> >>>> class
> >>>>> instead of the "bare-metal" interface. WDYT?
> >>>>
> >>>> Yes, in the gist, I have a class implementing `ProcessorSupplier`:
> >>>>
> >>>> ```
> >>>> public class TracingProcessorSupplier<K, V> implements
> >>> ProcessorSupplier<K,
> >>>> V> {
> >>>>   final KafkaTracing kafkaTracing;
> >>>>   final String name;
> >>>>   final ProcessorSupplier<K, V> delegate;
> >>>>    public TracingProcessorSupplier(KafkaTracing kafkaTracing,
> >>>>       String name, ProcessorSupplier<K, V> delegate) {
> >>>>     this.kafkaTracing = kafkaTracing;
> >>>>     this.name = name;
> >>>>     this.delegate = delegate;
> >>>>   }
> >>>>    @Override public Processor<K, V> get() {
> >>>>     return new TracingProcessor<>(kafkaTracing, name, delegate.get());
> >>>>   }
> >>>> }
> >>>> ```
> >>>>
> >>>> My challenge is how to wrap Topology Processors created by
> >>>> `StreamsBuilder#build` to make this instrumentation easy to adopt by
> >>> Kafka
> >>>> Streams users.
> >>>>
> >>>> @John:
> >>>>
> >>>>> The diff you posted only contains the library-side changes, and it's
> >>> not
> >>>>> obvious how you would use this to insert the desired tracing code.
> >>>>> Perhaps you could provide a snippet demonstrating how you want to use
> >>>> this
> >>>>> change to enable tracing?
> >>>>
> >>>> My first approach was something like this:
> >>>>
> >>>> ```
> >>>> final StreamsBuilder builder = kafkaStreamsTracing.builder();
> >>>> ```
> >>>>
> >>>> Where `KafkaStreamsTracing#builder` looks like this:
> >>>>
> >>>> ```
> >>>>   public StreamsBuilder builder() {
> >>>>     return new StreamsBuilder(new Topology(new
> >>>> TracingInternalTopologyBuilder(kafkaTracing)));
> >>>>   }
> >>>> ```
> >>>>
> >>>> Then, once the builder creates a topology, `processors` will be
> wrapped
> >>> by
> >>>> `TracingProcessorSupplier` described above.
> >>>>
> >>>> Probably this approach is too naive but works as an initial proof of
> >>>> concept.
> >>>>
> >>>>> Off the top of my head, here are some other approaches you might
> >>>> evaluate:
> >>>>> * you mentioned interceptors. Perhaps we could create a
> >>>>> ProcessorInterceptor interface and add a config to set it.
> >>>>
> >>>> This sounds very interesting to me. Then we won't need to touch
> >> internal
> >>>> API's, and just provide some configs. One challenge here is how to
> >> define
> >>>> the hooks. In consumer/producer, lifecycle is clear,
> >>> `onConsumer`/`onSend`
> >>>> and then `onCommit`/`onAck` methods. For Stream processors, how this
> >> will
> >>>> look like? Maybe `beforeProcess(context, key, value)` and
> >>>> `afterProcess(context, key, value)`.
> >>>>
> >>>>> * perhaps we could simply build the tracing headers into Streams. Is
> >>>> there
> >>>>> a benefit to making it customizable?
> >>>>
> >>>> I don't understand this option completely. Do you mean something like
> >>>> KIP-159 (
> >>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-
> >>>> 159%3A+Introducing+Rich+functions+to+Streams
> >>>> )?
> >>>> Headers available on StreamsDSL will allow users to create "custom"
> >>> traces,
> >>>> for instance:
> >>>>
> >>>> ```
> >>>> stream.map( (headers, k, v) -> {
> >>>>   Span span = kafkaTracing.nextSpan(headers).start();
> >>>>   doSomething(k, v);
> >>>>   span.finish();
> >>>> }
> >>>> ```
> >>>>
> >>>> but it won't be possible to instrument the existing processors exposed
> >> by
> >>>> DSL only by enabling headers on Streams DSL.
> >>>>
> >>>> If we can define a way to pass a `ProcessorSupplier` to be used by
> >>>> `StreamsBuilder#internalTopology` -not sure if via constructor or some
> >>>> other way- would be enough to support this use-case.
> >>>>
> >>>>> Also, as Matthias said, you would need to create a KIP to propose
> >> this
> >>>>> change, but of course we can continue this preliminary discussion
> >> until
> >>>> you
> >>>>> feel confident to create the KIP.
> >>>>
> >>>> Happy to do it once the approach is clearer.
> >>>>
> >>>> Cheers,
> >>>> Jorge.
> >>>>
> >>>> El lun., 17 sept. 2018 a las 17:09, John Roesler (<j...@confluent.io
> >)
> >>>> escribió:
> >>>>
> >>>>> If I understand the request, it's about tracking the latencies for a
> >>>>> specific record, not the aggregated latencies for each processor.
> >>>>>
> >>>>> Jorge,
> >>>>>
> >>>>> The diff you posted only contains the library-side changes, and it's
> >>> not
> >>>>> obvious how you would use this to insert the desired tracing code.
> >>>>> Perhaps you could provide a snippet demonstrating how you want to use
> >>>> this
> >>>>> change to enable tracing?
> >>>>>
> >>>>> Also, as Matthias said, you would need to create a KIP to propose
> >> this
> >>>>> change, but of course we can continue this preliminary discussion
> >> until
> >>>> you
> >>>>> feel confident to create the KIP.
> >>>>>
> >>>>> Off the top of my head, here are some other approaches you might
> >>>> evaluate:
> >>>>> * you mentioned interceptors. Perhaps we could create a
> >>>>> ProcessorInterceptor interface and add a config to set it.
> >>>>> * perhaps we could simply build the tracing headers into Streams. Is
> >>>> there
> >>>>> a benefit to making it customizable?
> >>>>>
> >>>>> Thanks for considering this problem!
> >>>>> -John
> >>>>>
> >>>>> On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com>
> >>>> wrote:
> >>>>>
> >>>>>> Hello Jorge,
> >>>>>>
> >>>>>> From the TracingProcessor implementation it seems you want to track
> >>>>>> per-processor processing latency, is that right? If this is the
> >> case
> >>>> you
> >>>>>> can actually use the per-processor metrics which include latency
> >>>> sensors.
> >>>>>>
> >>>>>> If you do want to track, for a certain record, what's the latency
> >> of
> >>>>>> processing it, then you'd probably need the processor
> >> implementation
> >>> in
> >>>>>> your repo. In this case, though, I'd suggest to provide a
> >>>>>> WrapperProcessorSupplier for the users than modifying
> >>>>>> InternalStreamsTopology: more specifically, you can provide an
> >>>>>> `abstract WrapperProcessorSupplier
> >>>>>> implements ProcessorSupplier` and then let users to instantiate
> >> this
> >>>>> class
> >>>>>> instead of the "bare-metal" interface. WDYT?
> >>>>>>
> >>>>>>
> >>>>>> Guozhang
> >>>>>>
> >>>>>> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> >>>>>> quilcate.jo...@gmail.com> wrote:
> >>>>>>
> >>>>>>> Thanks for your answer, Matthias!
> >>>>>>>
> >>>>>>> What I'm looking for is something similar to interceptors, but
> >> for
> >>>>> Stream
> >>>>>>> Processors.
> >>>>>>>
> >>>>>>> In Zipkin -and probably other tracing implementations as well- we
> >>> are
> >>>>>> using
> >>>>>>> Headers to propagate the context of a trace (i.e. adding metadata
> >>> to
> >>>>> the
> >>>>>>> Kafka Record, so we can create references to a trace).
> >>>>>>> Now that Headers are part of Kafka Streams Processor API, we can
> >>>>>> propagate
> >>>>>>> context from input (Consumers) to outputs (Producers) by using
> >>>>>>> `KafkaClientSupplier` (e.g. <
> >>>>>>> https://github.com/openzipkin/brave/blob/master/
> >>>>>>> instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> >>>>>>> TracingKafkaClientSupplier.java
> >>>>>>>> ).
> >>>>>>>
> >>>>>>> "Input to Output" traces could be enough for some use-cases, but
> >> we
> >>>> are
> >>>>>>> looking for a more detailed trace -that could cover cases like
> >>>>>> side-effects
> >>>>>>> (e.g. for each processor), where input/output and processors
> >>>> latencies
> >>>>>> can
> >>>>>>> be recorded. This is why I have been looking for how to decorate
> >>> the
> >>>>>>> `ProcessorSupplier` and all the changes shown in the comparison.
> >>> Here
> >>>>> is
> >>>>>> a
> >>>>>>> gist of how we are planning to decorate the `addProcessor`
> >> method:
> >>>>>>> https://github.com/openzipkin/brave/compare/master...jeqo:
> >>>>>>> kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> >>>>>>>
> >>>>>>> Hope this makes a bit more sense now :)
> >>>>>>>
> >>>>>>> El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> >>>>>>> matth...@confluent.io>)
> >>>>>>> escribió:
> >>>>>>>
> >>>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
> >>>>>>>>
> >>>>>>>> What do you mean by this exactly? Is there a JIRA? I am fine
> >>>> removing
> >>>>>>>> `final` from `InternalTopologyBuilder#addProcessor()` -- it's
> >> an
> >>>>>>>> internal class.
> >>>>>>>>
> >>>>>>>> However, the diff also shows
> >>>>>>>>
> >>>>>>>>> public Topology(final InternalTopologyBuilder
> >>>>>> internalTopologyBuilder)
> >>>>>>> {
> >>>>>>>>
> >>>>>>>> This has two impacts: first, it modifies `Topology` what is
> >> part
> >>> of
> >>>>>>>> public API and would require a KIP. Second, it exposes
> >>>>>>>> `InternalTopologyBuilder` as part of the public API --
> >> something
> >>> we
> >>>>>>>> should not do.
> >>>>>>>>
> >>>>>>>> I am also not sure, why you want to do this (btw: also public
> >> API
> >>>>>> change
> >>>>>>>> requiring a KIP). However, this should not be necessary.
> >>>>>>>>
> >>>>>>>>>     public StreamsBuilder(final Topology topology)  {
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> I think I am lacking some context what you try to achieve.
> >> Maybe
> >>>> you
> >>>>>> can
> >>>>>>>> elaborate in the problem you try to solve?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> -Matthias
> >>>>>>>>
> >>>>>>>> On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> >>>>>>>>> Hi everyone,
> >>>>>>>>>
> >>>>>>>>> I'm experimenting on how to add tracing to Kafka Streams.
> >>>>>>>>>
> >>>>>>>>> One option is to override and access
> >>>>>>>>> `InternalTopologyBuilder#addProcessor`. Currently this method
> >>>> it is
> >>>>>>>> final,
> >>>>>>>>> and builder is not exposed as part of `StreamsBuilder`:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> public class StreamsBuilder {
> >>>>>>>>>
> >>>>>>>>>     /** The actual topology that is constructed by this
> >>>>>> StreamsBuilder.
> >>>>>>>> */
> >>>>>>>>>     private final Topology topology = new Topology();
> >>>>>>>>>
> >>>>>>>>>     /** The topology's internal builder. */
> >>>>>>>>>     final InternalTopologyBuilder internalTopologyBuilder =
> >>>>>>>>> topology.internalTopologyBuilder;
> >>>>>>>>>
> >>>>>>>>>     private final InternalStreamsBuilder
> >>> internalStreamsBuilder =
> >>>>> new
> >>>>>>>>> InternalStreamsBuilder(internalTopologyBuilder);
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> The goal is that If `builder#addProcessor` is exposed, we
> >> could
> >>>>>>> decorate
> >>>>>>>>> every `ProcessorSupplier` and capture traces from it:
> >>>>>>>>>
> >>>>>>>>> ```
> >>>>>>>>> @Override
> >>>>>>>>>   public void addProcessor(String name, ProcessorSupplier
> >>>> supplier,
> >>>>>>>>> String... predecessorNames) {
> >>>>>>>>>     super.addProcessor(name, new TracingProcessorSupplier(
> >>>> tracer,
> >>>>>>> name,
> >>>>>>>>> supplier), predecessorNames);
> >>>>>>>>>   }
> >>>>>>>>> ```
> >>>>>>>>>
> >>>>>>>>> Would it make sense to propose this as a change:
> >>>>>>>>>
> >>>>>>
> >>> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> >>>>>>> ?
> >>>>>>>> or
> >>>>>>>>> maybe there is a better way to do this?
> >>>>>>>>> TopologyWrapper does something similar:
> >>>>>>>>>
> >>>>>>>> https://github.com/apache/kafka/blob/trunk/streams/src/
> >>>>>>> test/java/org/apache/kafka/streams/TopologyWrapper.java
> >>>>>>>>>
> >>>>>>>>> Thanks in advance for any help.
> >>>>>>>>>
> >>>>>>>>> Cheers,
> >>>>>>>>> Jorge.
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>>
> >>>>>>
> >>>>>> --
> >>>>>> -- Guozhang
> >>>>>>
> >>>>>
> >>>>
> >>>
> >>>
> >>>
> >>> --
> >>> -- Guozhang
> >>>
> >>
> >
>
>

Reply via email to