Guozhang, I updated the code slightly to avoid object creation and I did some perf investigations.
1) JMH Benchmark with the below topology using TopologyTestDriver to pipe data throw the topology: > StreamsBuilder builder = new StreamsBuilder(); > KStream stream = builder.stream("topic").transform(new > TransformerSupplier<Object, Object, KeyValue<Object, Object>>() { > @Override > public Transformer<Object, Object, KeyValue<Object, Object>> > get() { > return new Transformer<Object, Object, KeyValue<Object, > Object>>() { > ProcessorContext context; > > @Override > public void init(ProcessorContext context) { > this.context = context; > } > > @Override > public KeyValue<Object, Object> transform(Object key, > Object value) { > context.forward(key, value); > return null; > } > > @Override > public KeyValue<Object, Object> punctuate(long timestamp) > { > return null; > } > > @Override > public void close() {} > }; > } > }); I run this with zero, one, and five downstream nodes like: > stream.foreach(new ForeachAction() { > @Override > public void apply(Object key, Object value) {} > }); On `trunk` I get the following numbers (5 warmup iterations, 15 test iterations) Zero Downstream Nodes: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 2246686.693 ± 56372.920 ops/s One Downstream Node: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 2206277.298 ± 51855.465 ops/s Five Downstream Nodes: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 1855833.516 ± 46901.811 ops/s Repeating the same on my PR branch I get the following numbers: Zero Downstream Nodes: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 2192891.762 ± 77598.908 ops/s One Downstream Node: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 2190676.716 ± 77030.594 ops/s Five Downstream Nodes: > # Run complete. Total time: 00:00:20 > > Benchmark Mode Cnt Score Error Units > PapiBenchmark.runTestDriver thrpt 15 1921632.144 ± 66276.232 ops/s I also had a look in GC and did not observe an issues. The objects that get created are all in young gen and thus cleaning them up is cheap. Let me know if this addresses your concerns. -Matthias On 2/11/18 9:36 PM, Guozhang Wang wrote: > Hi Matthias, > > Just clarifying a meta question along side with my vote: we still need to > understand the overhead of the `To` objects during run time to determine > whether we would use it in the final proposal or using overloading > functions. Right? > > > Guozhang > > On Sun, Feb 11, 2018 at 9:33 PM, Guozhang Wang <wangg...@gmail.com> wrote: > >> +1 >> >> On Fri, Feb 9, 2018 at 5:31 AM, Bill Bejeck <bbej...@gmail.com> wrote: >> >>> Thanks for the KIP, +1 for me. >>> >>> -Bill >>> >>> On Fri, Feb 9, 2018 at 6:45 AM, Damian Guy <damian....@gmail.com> wrote: >>> >>>> Thanks Matthias, +1 >>>> >>>> On Fri, 9 Feb 2018 at 02:42 Ted Yu <yuzhih...@gmail.com> wrote: >>>> >>>>> +1 >>>>> -------- Original message --------From: "Matthias J. Sax" < >>>>> matth...@confluent.io> Date: 2/8/18 6:05 PM (GMT-08:00) To: >>>>> dev@kafka.apache.org Subject: [VOTE] KIP-251: Allow timestamp >>>>> manipulation in Processor API >>>>> Hi, >>>>> >>>>> I want to start the vote for KIP-251: >>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP- >>>> 251%3A+Allow+timestamp+manipulation+in+Processor+API >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> >>>> >>> >> >> >> >> -- >> -- Guozhang >> > > >
signature.asc
Description: OpenPGP digital signature