If I understand the request, it's about tracking the latencies for a
specific record, not the aggregated latencies for each processor.

Jorge,

The diff you posted only contains the library-side changes, and it's not
obvious how you would use this to insert the desired tracing code.
Perhaps you could provide a snippet demonstrating how you want to use this
change to enable tracing?

Also, as Matthias said, you would need to create a KIP to propose this
change, but of course we can continue this preliminary discussion until you
feel confident to create the KIP.

Off the top of my head, here are some other approaches you might evaluate:
* you mentioned interceptors. Perhaps we could create a
ProcessorInterceptor interface and add a config to set it.
* perhaps we could simply build the tracing headers into Streams. Is there
a benefit to making it customizable?

Thanks for considering this problem!
-John

On Mon, Sep 17, 2018 at 12:30 AM Guozhang Wang <wangg...@gmail.com> wrote:

> Hello Jorge,
>
> From the TracingProcessor implementation it seems you want to track
> per-processor processing latency, is that right? If this is the case you
> can actually use the per-processor metrics which include latency sensors.
>
> If you do want to track, for a certain record, what's the latency of
> processing it, then you'd probably need the processor implementation in
> your repo. In this case, though, I'd suggest to provide a
> WrapperProcessorSupplier for the users than modifying
> InternalStreamsTopology: more specifically, you can provide an
> `abstract WrapperProcessorSupplier
> implements ProcessorSupplier` and then let users to instantiate this class
> instead of the "bare-metal" interface. WDYT?
>
>
> Guozhang
>
> On Sun, Sep 16, 2018 at 12:56 PM, Jorge Esteban Quilcate Otoya <
> quilcate.jo...@gmail.com> wrote:
>
> > Thanks for your answer, Matthias!
> >
> > What I'm looking for is something similar to interceptors, but for Stream
> > Processors.
> >
> > In Zipkin -and probably other tracing implementations as well- we are
> using
> > Headers to propagate the context of a trace (i.e. adding metadata to the
> > Kafka Record, so we can create references to a trace).
> > Now that Headers are part of Kafka Streams Processor API, we can
> propagate
> > context from input (Consumers) to outputs (Producers) by using
> > `KafkaClientSupplier` (e.g. <
> > https://github.com/openzipkin/brave/blob/master/
> > instrumentation/kafka-streams/src/main/java/brave/kafka/streams/
> > TracingKafkaClientSupplier.java
> > >).
> >
> > "Input to Output" traces could be enough for some use-cases, but we are
> > looking for a more detailed trace -that could cover cases like
> side-effects
> > (e.g. for each processor), where input/output and processors latencies
> can
> > be recorded. This is why I have been looking for how to decorate the
> > `ProcessorSupplier` and all the changes shown in the comparison. Here is
> a
> > gist of how we are planning to decorate the `addProcessor` method:
> > https://github.com/openzipkin/brave/compare/master...jeqo:
> > kafka-streams-topology#diff-8282914d84039affdf7c37251b905b44R7
> >
> > Hope this makes a bit more sense now :)
> >
> > El dom., 16 sept. 2018 a las 20:51, Matthias J. Sax (<
> > matth...@confluent.io>)
> > escribió:
> >
> > > >> I'm experimenting on how to add tracing to Kafka Streams.
> > >
> > > What do you mean by this exactly? Is there a JIRA? I am fine removing
> > > `final` from `InternalTopologyBuilder#addProcessor()` -- it's an
> > > internal class.
> > >
> > > However, the diff also shows
> > >
> > > > public Topology(final InternalTopologyBuilder
> internalTopologyBuilder)
> > {
> > >
> > > This has two impacts: first, it modifies `Topology` what is part of
> > > public API and would require a KIP. Second, it exposes
> > > `InternalTopologyBuilder` as part of the public API -- something we
> > > should not do.
> > >
> > > I am also not sure, why you want to do this (btw: also public API
> change
> > > requiring a KIP). However, this should not be necessary.
> > >
> > > >     public StreamsBuilder(final Topology topology)  {
> > >
> > >
> > > I think I am lacking some context what you try to achieve. Maybe you
> can
> > > elaborate in the problem you try to solve?
> > >
> > >
> > > -Matthias
> > >
> > > On 9/15/18 10:31 AM, Jorge Esteban Quilcate Otoya wrote:
> > > > Hi everyone,
> > > >
> > > > I'm experimenting on how to add tracing to Kafka Streams.
> > > >
> > > > One option is to override and access
> > > > `InternalTopologyBuilder#addProcessor`. Currently this method it is
> > > final,
> > > > and builder is not exposed as part of `StreamsBuilder`:
> > > >
> > > > ```
> > > > public class StreamsBuilder {
> > > >
> > > >     /** The actual topology that is constructed by this
> StreamsBuilder.
> > > */
> > > >     private final Topology topology = new Topology();
> > > >
> > > >     /** The topology's internal builder. */
> > > >     final InternalTopologyBuilder internalTopologyBuilder =
> > > > topology.internalTopologyBuilder;
> > > >
> > > >     private final InternalStreamsBuilder internalStreamsBuilder = new
> > > > InternalStreamsBuilder(internalTopologyBuilder);
> > > > ```
> > > >
> > > > The goal is that If `builder#addProcessor` is exposed, we could
> > decorate
> > > > every `ProcessorSupplier` and capture traces from it:
> > > >
> > > > ```
> > > > @Override
> > > >   public void addProcessor(String name, ProcessorSupplier supplier,
> > > > String... predecessorNames) {
> > > >     super.addProcessor(name, new TracingProcessorSupplier(tracer,
> > name,
> > > > supplier), predecessorNames);
> > > >   }
> > > > ```
> > > >
> > > > Would it make sense to propose this as a change:
> > > >
> https://github.com/apache/kafka/compare/trunk...jeqo:tracing-topology
> > ?
> > > or
> > > > maybe there is a better way to do this?
> > > > TopologyWrapper does something similar:
> > > >
> > > https://github.com/apache/kafka/blob/trunk/streams/src/
> > test/java/org/apache/kafka/streams/TopologyWrapper.java
> > > >
> > > > Thanks in advance for any help.
> > > >
> > > > Cheers,
> > > > Jorge.
> > > >
> > >
> > >
> >
>
>
>
> --
> -- Guozhang
>

Reply via email to