Yennick Trevels created KAFKA-4963:
--------------------------------------
Summary: Global Store: startup recovery process skipping processor
Key: KAFKA-4963
URL: https://issues.apache.org/jira/browse/KAFKA-4963
Project: Kafka
Issue Type: Bug
Components: streams
Affects Versions: 0.10.2.0
Reporter: Yennick Trevels
This issue is related to the recovery process of a global store. It might be
that I'm misunderstanding the design of the global store as it's all quite new
to me, but I wanted to verify this case.
I'm trying to create a global store with a processor which transforms the
values from the source and puts them into the state store, and I want all these
transformed values to be available in every streams job (therefore the use of a
global store)
I'll give you an example which I created based on an existing Kafka Streams
unit test:
{code}
final StateStoreSupplier storeSupplier = Stores.create("my-store")
.withStringKeys().withIntegerValues().inMemory().disableLogging().build();
final String global = "global";
final String topic = "topic";
final KeyValueStore<String, String> globalStore = (KeyValueStore<String,
String>) storeSupplier.get();
final TopologyBuilder topologyBuilder = this.builder
.addGlobalStore(globalStore, global, STRING_DESERIALIZER,
STRING_DESERIALIZER, topic, "processor", define(new
ValueToLengthStatefulProcessor("my-store")));
driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
assertEquals("value1".length(), globalStore.get("key1"));
assertEquals("value2".length(), globalStore.get("key2"));
{code}
The ValueToLengthStatefulProcessor basically takes the incoming value,
calculates the length of the string, and puts the result in the state store.
Note the difference in types between the source stream (string values) and the
data store (integer values)
If I understand global stores correctly and based on what I've tried out
already, the stream of data runs like this:
a source stream named "global" reading values from a Kafka topic called "topic"
-> ValueToLengthStatefulProcessor -> "my-store" state store
However, when the streams job starts up it runs the recovery process by reading
out the source stream again. I've noticed that in this case it seems to skip
the processor entirely and acts like the source stream is the changelog of the
state store, making the data flow like this during the recovery process:
source stream -> "my store" state store
Because it acts like the source stream is the changelog of the state store, it
also tries to use the deserializer of the state store. This won't work since
the values of the state store should be integers, while the values in the
source stream are strings.
So all this will startup nicely as long as the source stream has no values yet.
However, once the source stream has (string) values, the startup recovery
process will fail since it will be sourcing directly to the state store instead
of passing the source values to the processor.
I believe this is caused by the following line of code in
TopologyBuilder.addGlobalStore, which connects the store directly to the source
topic.
https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
Please let me know if I'm totally misunderstanding how global stores should
work. But I think this might be a crucial bug or design flaw.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)