[
https://issues.apache.org/jira/browse/KAFKA-14070?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Balaji Rao updated KAFKA-14070:
-------------------------------
Description:
When using key-value state stores with Processor API, one can add key-value
state stores of arbitrary key types to a topology. This could lead to the
method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect
expectations.
In my understanding, `queryMetadataForKey` uses the source topics of the
processor connected to the store to return the `KeyQueryMetadata`. This means
that it could provide "incorrect" answers when used with key-value stores of
arbitrary key types. The description of the method should be improved to make
users aware of this pitfall.
Edit: Example scala code
{code:scala}
val input = streamsBuilder.stream(
"input-topic",
Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
)
private val storeBuilder = Stores
.keyValueStoreBuilder[String, String](
Stores.inMemoryKeyValueStore("store"),
Serdes.stringSerde,
Serdes.stringSerde
)
streamsBuilder.addStateStore(storeBuilder)
input.process(
new ProcessorSupplier[Int, String, Void, Void] {
override def get(): Processor[Int, String, Void, Void] =
new Processor[Int, String, Void, Void] {
var store: KeyValueStore[String, String] = _
override def init(context: ProcessorContext[Void, Void]): Unit = {
super.init(context)
store = context.getStateStore("store")
}
override def process(record: Record[Int, String]): Unit = {
('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
}
}
},
"store"
){code}
In the code sample above, the usage of `queryMetadataForKey` on `store` to find
the
was:
When using key-value state stores with Processor API, one can add key-value
state stores of arbitrary key types to a topology. This could lead to the
method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect
expectations.
In my understanding, `queryMetadataForKey` uses the source topics of the
processor connected to the store to return the `KeyQueryMetadata`. This means
that it could provide "incorrect" answers when used with key-value stores of
arbitrary key types. The description of the method should be improved to make
users aware of this pitfall.
> Improve documentation for queryMetadataForKey
> ---------------------------------------------
>
> Key: KAFKA-14070
> URL: https://issues.apache.org/jira/browse/KAFKA-14070
> Project: Kafka
> Issue Type: Improvement
> Components: streams
> Affects Versions: 3.2.0
> Reporter: Balaji Rao
> Priority: Minor
>
> When using key-value state stores with Processor API, one can add key-value
> state stores of arbitrary key types to a topology. This could lead to the
> method `queryMetadataForKey` in `KafkaStreams` to be used with incorrect
> expectations.
> In my understanding, `queryMetadataForKey` uses the source topics of the
> processor connected to the store to return the `KeyQueryMetadata`. This means
> that it could provide "incorrect" answers when used with key-value stores of
> arbitrary key types. The description of the method should be improved to make
> users aware of this pitfall.
> Edit: Example scala code
> {code:scala}
> val input = streamsBuilder.stream(
> "input-topic",
> Consumed.`with`(Serdes.intSerde, Serdes.stringSerde)
> )
> private val storeBuilder = Stores
> .keyValueStoreBuilder[String, String](
> Stores.inMemoryKeyValueStore("store"),
> Serdes.stringSerde,
> Serdes.stringSerde
> )
> streamsBuilder.addStateStore(storeBuilder)
> input.process(
> new ProcessorSupplier[Int, String, Void, Void] {
> override def get(): Processor[Int, String, Void, Void] =
> new Processor[Int, String, Void, Void] {
> var store: KeyValueStore[String, String] = _
> override def init(context: ProcessorContext[Void, Void]): Unit = {
> super.init(context)
> store = context.getStateStore("store")
> }
> override def process(record: Record[Int, String]): Unit = {
> ('a' to 'j').foreach(x => store.put(s"$x", s"${record.key}"))
> }
> }
> },
> "store"
> ){code}
>
> In the code sample above, the usage of `queryMetadataForKey` on `store` to
> find the
--
This message was sent by Atlassian Jira
(v8.20.10#820010)