[
https://issues.apache.org/jira/browse/KAFKA-7882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Matthias J. Sax resolved KAFKA-7882.
------------------------------------
Resolution: Duplicate
> StateStores are frequently closed during the 'transform' method
> ---------------------------------------------------------------
>
> Key: KAFKA-7882
> URL: https://issues.apache.org/jira/browse/KAFKA-7882
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.0.0
> Reporter: Mateusz Owczarek
> Priority: Major
>
> Hello, I have a problem with the state store being closed frequently while
> transforming upcoming records. To ensure only one record of the same key and
> the window comes to an aggregate I have created a custom Transformer (I know
> something similar is going to be introduced with suppress method on KTable in
> the future, but my implementation is quite simple and imo should work
> correctly) with the following implementation:
> {code:java}
> override def transform(key: Windowed[K], value: V): (Windowed[K], V) = {
> val partition = context.partition()
> if (partition != -1) store.put(key.key(), (value, partition),
> key.window().start())
> else logger.warn(s"-1 partition")
> null //Ensuring no 1:1 forwarding, context.forward and commit logic is in the
> punctuator callback
> }
> {code}
>
> What I do get is the following error:
> {code:java}
> Store MyStore is currently closed{code}
> This problem appears only when the number of streaming threads (or input
> topic partitions) is greater than 1 even if I'm just saving to the store and
> turn off the punctuation.
> If punctuation is present, however, I sometimes get -1 as a partition value
> in the transform method. I'm familiar with the basic docs, however, I haven't
> found anything that could help me here.
> I build my state store like this:
> {code:java}
> val stateStore = Stores.windowStoreBuilder(
> Stores.persistentWindowStore(
> stateStoreName,
> timeWindows.maintainMs() + timeWindows.sizeMs +
> TimeUnit.DAYS.toMillis(1),
> timeWindows.segments,
> timeWindows.sizeMs,
> false
> ),
> serde[K],
> serde[(V, Int)]
> )
> {code}
> and include it in a DSL API like this:
> {code:java}
> builder.addStateStore(stateStore)
> (...).transform(new MyTransformer(...), "MyStore")
> {code}
> INB4: I don't close any state stores manually, I gave them retention time as
> long as possible for the debugging stage, I tried to hotfix that with the
> retry in the transform method and the state stores reopen at the end and the
> `put` method works, but this approach is questionable and I am concerned if
> it actually works.
> Edit:
> May this be because of the fact that the
> {code:java}StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG{code} is set to low
> value? If I understand correctly, spilling to disk is done therefore more
> frequently, may it, therefore, cause closing the store?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)