[ 
https://issues.apache.org/jira/browse/KAFKA-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15323718#comment-15323718
 ] 

Jeff Klukas commented on KAFKA-3801:
------------------------------------

Let me give more context for the example. We have an application that produces 
JSON messages to a Kafka topic interleaved with occasional checkpoint messages 
that are of {{Long}} type.

If I want to create a KStream of just the checkpoint messages, I need to filter 
out the JSON messages before deserializing. Here's what it looks like:

{{KStream<Long, Long> checkpointStream = builder.stream(Serdes.Long(), 
Serdes.ByteArray(), inputTopicName)}}
{{.filter((key, bytes) -> bytes.length == 
8).mapValues(LongDeserializer::deserialize)}}

I need to use ByteArraySerde when calling {{stream}}, then I do the 
deserialization in a {{mapValues}} invocation after filtering out messages of 
the wrong type.

Another option would be to materialize the stream to a topic after the filter 
and then call {{builder.stream(Serdes.Long(), Serdes.Long(), newTopicName)}}, 
but I'd like to avoid unnecessary materialization.

So in the current scheme, I need to create an instance of {{LongDeserializer}} 
separately so that I can then call its {{deserialize}} method in {{mapValues}}.

This situation probably won't occur frequently, so I understand if it's decided 
not to bother considering this change.

> Provide static serialize() and deserialize() for use as method references
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-3801
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3801
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients, streams
>            Reporter: Jeff Klukas
>            Assignee: Guozhang Wang
>            Priority: Minor
>             Fix For: 0.10.1.0
>
>
> While most calls to {{Serializer.serialize}} and {{Deserializer.deserialize}} 
> are abstracted away in Kafka Streams through the use of `Serdes` classes, 
> there are some instances where developers may want to call them directly. The 
> serializers and deserializers for simple types don't require any 
> configuration and could be static, but currently it's necessary to create an 
> instance to use those methods.
> I'd propose moving serialization logic into a {{static public byte[] 
> serialize(? data)}} method and deserialization logic into a {{static public ? 
> deserialize(byte[] data)}} method. The existing instance methods would simply 
> call the static versions.
> See a full example for LongSerializer and LongDeserializer here:
> https://github.com/apache/kafka/compare/trunk...jklukas:static-serde-methods?expand=1
> In Java 8, these static methods then become available for method references 
> in code like {{kstream.mapValues(LongDeserializer::deserialize)}} without the 
> user needing to create an instance of {{LongDeserializer}}.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to