[ 
https://issues.apache.org/jira/browse/KAFKA-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15945998#comment-15945998
 ] 

Matthias J. Sax commented on KAFKA-4963:
----------------------------------------

Your observation is correct and the behavior is by design. GlobalStores are not 
designed to put modified data but only to be populated with "raw" data from the 
topic it's reading from -- this topic is also source/changelog topic at once.

Thus, you would need to have a two step process and "duplicate" your data: 
first, read all data, apply your transformation, and write the modified data 
back into a topic. Use this topic for populate your store.

\cc [~damianguy] -- I think, we can close this as "not a problem" ? Please 
correct my if I said anything wrong.

> Global Store: startup recovery process skipping processor
> ---------------------------------------------------------
>
>                 Key: KAFKA-4963
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4963
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.10.2.0
>            Reporter: Yennick Trevels
>
> This issue is related to the recovery process of a global store. It might be 
> that I'm misunderstanding the design of the global store as it's all quite 
> new to me, but I wanted to verify this case.
> I'm trying to create a global store with a processor which transforms the 
> values from the source and puts them into the state store, and I want all 
> these transformed values to be available in every streams job (therefore the 
> use of a global store)
> I'll give you an example which I created based on an existing Kafka Streams 
> unit test:
> {code}
> final StateStoreSupplier storeSupplier = Stores.create("my-store")
>                 
> .withStringKeys().withIntegerValues().inMemory().disableLogging().build();
> final String global = "global";
> final String topic = "topic";
> final KeyValueStore<String, String> globalStore = (KeyValueStore<String, 
> String>) storeSupplier.get();
> final TopologyBuilder topologyBuilder = this.builder
>         .addGlobalStore(globalStore, global, STRING_DESERIALIZER, 
> STRING_DESERIALIZER, topic, "processor", define(new 
> ValueToLengthStatefulProcessor("my-store")));
> driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
> driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
> driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
> assertEquals("value1".length(), globalStore.get("key1"));
> assertEquals("value2".length(), globalStore.get("key2"));
> {code}
> The ValueToLengthStatefulProcessor basically takes the incoming value, 
> calculates the length of the string, and puts the result in the state store. 
> Note the difference in types between the source stream (string values) and 
> the data store (integer values)
> If I understand global stores correctly and based on what I've tried out 
> already, the stream of data runs like this:
> a source stream named "global" reading values from a Kafka topic called 
> "topic"  -> ValueToLengthStatefulProcessor -> "my-store" state store
> However, when the streams job starts up it runs the recovery process by 
> reading out the source stream again. I've noticed that in this case it seems 
> to skip the processor entirely and acts like the source stream is the 
> changelog of the state store, making the data flow like this during the 
> recovery process:
> source stream -> "my store" state store
> Because it acts like the source stream is the changelog of the state store, 
> it also tries to use the deserializer of the state store. This won't work 
> since the values of the state store should be integers, while the values in 
> the source stream are strings.
> So all this will startup nicely as long as the source stream has no values 
> yet. However, once the source stream has (string) values, the startup 
> recovery process will fail since it will be sourcing directly to the state 
> store instead of passing the source values to the processor.
> I believe this is caused by the following line of code in 
> TopologyBuilder.addGlobalStore, which connects the store directly to the 
> source topic.
> https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
> Please let me know if I'm totally misunderstanding how global stores should 
> work. But I think this might be a crucial bug or design flaw.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to