[ https://issues.apache.org/jira/browse/KAFKA-15297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17750622#comment-17750622 ]
Bruno Cadonna edited comment on KAFKA-15297 at 8/3/23 8:09 AM: --------------------------------------------------------------- [~ableegoldman] You can observe the flush order by feeding some records into the input topics and looking for the following log message: https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513 I changed the log message from trace to debug to avoid too much noise. I will also update the description with this information. was (Author: cadonna): [~ableegoldman] You can observe the flush order by feeding some records into the input topics and looking for the following log message: https://github.com/apache/kafka/blob/bb48b157af6ca06972927666a5b6f84b9551fe3a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513-L513 I changed the log message from trace to debug to avoid too much noise. I will also update the description with this information. > Cache flush order might not be topological order > ------------------------------------------------- > > Key: KAFKA-15297 > URL: https://issues.apache.org/jira/browse/KAFKA-15297 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.4.0 > Reporter: Bruno Cadonna > Priority: Major > Attachments: minimal_example.png > > > The flush order of the state store caches in Kafka Streams might not > correspond to the topological order of the state stores in the topology. The > order depends on how the processors and state stores are added to the > topology. > In some cases downstream state stores might be flushed before upstream state > stores. That means, that during a commit records in upstream caches might end > up in downstream caches that have already been flushed during the same > commit. If a crash happens at that point, those records in the downstream > caches are lost. Those records are lost for two reasons: > 1. Records in caches are only changelogged after they are flushed from the > cache. However, the downstream caches have already been flushed and they will > not be flushed again during the same commit. > 2. The offsets of the input records that caused the records that now are > blocked in the downstream caches are committed during the same commit and so > they will not be re-processed after the crash. > An example for a topology where the flush order of the caches is wrong is the > following: > {code:java} > final String inputTopic1 = "inputTopic1"; > final String inputTopic2 = "inputTopic2"; > final String outputTopic1 = "outputTopic1"; > final String processorName = "processor1"; > final String stateStoreA = "stateStoreA"; > final String stateStoreB = "stateStoreB"; > final String stateStoreC = "stateStoreC"; > streamsBuilder.stream(inputTopic2, Consumed.with(Serdes.String(), > Serdes.String())) > .process( > () -> new Processor<String, String, String, String>() { > private ProcessorContext<String, String> context; > @Override > public void init(ProcessorContext<String, String> context) { > this.context = context; > } > @Override > public void process(Record<String, String> record) { > context.forward(record); > } > @Override > public void close() {} > }, > Named.as("processor1") > ) > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > streamsBuilder.stream(inputTopic1, Consumed.with(Serdes.String(), > Serdes.String())) > .toTable(Materialized.<String, String, KeyValueStore<Bytes, > byte[]>>as(stateStoreA).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized.<String, String, > KeyValueStore<Bytes, > byte[]>>as(stateStoreB).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .mapValues(value -> value, Materialized.<String, String, > KeyValueStore<Bytes, > byte[]>>as(stateStoreC).withKeySerde(Serdes.String()).withValueSerde(Serdes.String())) > .toStream() > .to(outputTopic1, Produced.with(Serdes.String(), Serdes.String())); > final Topology topology = streamsBuilder.build(streamsConfiguration); > topology.connectProcessorAndStateStores(processorName, stateStoreC); > {code} > This code results in the attached topology. > In the topology {{processor1}} is connected to {{stateStoreC}}. If > {{processor1}} is added to the topology before the other processors, i.e., if > the right branch of the topology is added before the left branch as in the > code above, the cache of {{stateStoreC}} is flushed before the caches of > {{stateStoreA}} and {{stateStoreB}}. > You can observe the flush order by feeding some records into the input topics > of the topology and looking for the following log message: > https://github.com/apache/kafka/blob/2e1947d240607d53f071f61c875cfffc3fec47fe/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java#L513 > > I changed the log message from trace to debug to avoid too much noise. -- This message was sent by Atlassian Jira (v8.20.10#820010)