[ https://issues.apache.org/jira/browse/KAFKA-6401?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16303964#comment-16303964 ]
Matthias J. Sax commented on KAFKA-6401: ---------------------------------------- We just recently improved the docs for Kafka Stream in the AK web page: https://kafka.apache.org/10/documentation/streams/developer-guide/interactive-queries.html The behavior is by design -- stores can go "offline" anytime as a rebalance can happen anytime. Also on startup, stores must get available what take some time. I am closing this as "not a problem" -- if you have further questions, please ask at the mailing list: https://kafka.apache.org/contact > InvalidStateStoreException immediately after starting streams > ------------------------------------------------------------- > > Key: KAFKA-6401 > URL: https://issues.apache.org/jira/browse/KAFKA-6401 > Project: Kafka > Issue Type: Bug > Components: streams > Environment: ubuntu 14.04 > Reporter: Mostafa Asgari > Priority: Minor > Attachments: Test.java > > > Hi > I wrote a simple kafka streams application. After I start the stream, if I > call KafkaStreams.store immediately, I will get InvalidStateStoreException: > {code:java} > org.apache.kafka.streams.errors.InvalidStateStoreException: the state store, > my-table, may have migrated to another instance. > {code} > Here is the complete code : > {code:java} > final StreamsBuilder builder = new StreamsBuilder(); > final KTable<String, Integer> table = builder.table(TOPIC_NAME , > Consumed.with(Serdes.String(), Serdes.Integer(), > new FailOnInvalidTimestamp(), > Topology.AutoOffsetReset.EARLIEST), Materialized.as("my-table")); > Topology topology = builder.build(); > Properties props = new Properties(); > > props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"my-streams-app"); > > props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092"); > props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,"10000"); > final KafkaStreams streams = new KafkaStreams( topology , props ); > Runtime.getRuntime().addShutdownHook(new Thread(){ > @Override > public void run() { > streams.close(); > } > }); > streams.start(); > ReadOnlyKeyValueStore<String, Integer> store > streams.store(table.queryableStoreName(), > QueryableStoreTypes.keyValueStore()); > {code} > However if after start() method, I write Thread.sleep( SOME_AMOUNT ) I will > not get the exception any more. > I wonder if it is a bug or I did something wrong. -- This message was sent by Atlassian JIRA (v6.4.14#64029)