[ https://issues.apache.org/jira/browse/KAFKA-5684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16109412#comment-16109412 ]
Guozhang Wang commented on KAFKA-5684: -------------------------------------- Thanks for filing the JIRA. I think we made a mistake when discussing KIP-160, that the key/valueSerde and mapper are actually overlapped. In the end we will only call key/valueSerde only when the K/V types are byte[], but due to that we are making a runtime instanceof check each time which is unnecessary since as users have provided the mapper whose output is a {{String}}, so it should take care of the deserialization if K/V are really byte[]. So I suggest we can change its internal impl as well along with this JIRA such that (cc [~james.c] WDYT): a) if mapper is provided, ignore serdes and always print as {{label: mapper(k, v)}} without checking the k/v types at runtime at all, label can be the default value if it is not provided. b) if mapper is not provided, print as {{label: toString(serialized(k), serialized(v))}} where label and serde could be default if they are not provided. > KStreamPrintProcessor as customized KStreamPeekProcessor > -------------------------------------------------------- > > Key: KAFKA-5684 > URL: https://issues.apache.org/jira/browse/KAFKA-5684 > Project: Kafka > Issue Type: Improvement > Components: streams > Reporter: Paolo Patierno > Assignee: Paolo Patierno > Priority: Minor > > Hi, > the {{KStreamPrintProcessor}} is implemented from scratch (from the > {{AbstractProcessor}}) and the same for the related supplier. > It looks to me that it's just a special {{KStreamPeekProcessor}} with > forwardDownStream to false and that allows the possibility to specify Serdes > instances used if key/values are bytes. > At same time used by a {{print()}} method it provides a fast way to print > data flowing through the pipeline (while using just {{peek()}} you need to > write the code). > I think that it could be useful to refactoring the {{KStreamPrintProcessor}} > as derived from the {{KStreamPeekProcessor}} customizing its behavior. > Thanks, > Paolo. -- This message was sent by Atlassian JIRA (v6.4.14#64029)