Ayoub Omari created KAFKA-16573:
-----------------------------------
Summary: Streams does not specify where a Serde is needed
Key: KAFKA-16573
URL: https://issues.apache.org/jira/browse/KAFKA-16573
Project: Kafka
Issue Type: Improvement
Components: streams
Affects Versions: 3.7.0
Reporter: Ayoub Omari
Example topology:
{code:java}
builder
.table("input", Consumed.`with`(Serdes.String(), Serdes.String()))
.groupBy((key, value) => new KeyValue(value, key))
.count()
.toStream()
.to("output", Produced.`with`(Serdes.String(), Serdes.Long()))
{code}
At runtime, we get the following exception
{code:java}
Please specify a key serde or set one through
StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
org.apache.kafka.common.config.ConfigException: Please specify a key serde or
set one through StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG
at
org.apache.kafka.streams.StreamsConfig.defaultKeySerde(StreamsConfig.java:1857)
at
org.apache.kafka.streams.processor.internals.AbstractProcessorContext.keySerde(AbstractProcessorContext.java:92)
at
org.apache.kafka.streams.processor.internals.SerdeGetter.keySerde(SerdeGetter.java:47)
at
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
at
org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareKeySerde(WrappingNullableUtils.java:90)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.initStoreSerde(MeteredKeyValueStore.java:188)
at
org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:143)
at
org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:232)
at
org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:102)
at
org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:258){code}
The error does not give information about the line or the processor causing the
issue.
Here a Grouped was missing inside the groupBy, but because the groupBy api
doesn't force to define Grouped, this one can be missed, and it could be
difficult to spot on a more complex topology.
Also, for someone who needs control over serdes in the topology and doesn't
want to define default serdes.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)