Tom Dearman created KAFKA-3207: ---------------------------------- Summary: StateStore seems to be writing state to one topic but restoring from another Key: KAFKA-3207 URL: https://issues.apache.org/jira/browse/KAFKA-3207 Project: Kafka Issue Type: Bug Components: kafka streams Affects Versions: 0.9.1.0 Environment: MacOS El Capitan Reporter: Tom Dearman Priority: Blocker
The state store (I am using in-memory state store) writes to a topic call [store-name] but restores from [job-id]-[store-name]-changelog. You can see in StoreChangeLogger that it writes to a topic which is the [store-name] passed through from the store supplier factory, but restores from the above topic name. My topology is: TopologyBuilder builder = new TopologyBuilder(); SerializerAdapter<CommonKey> commonKeyAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE); SerializerAdapter<GamePlayValue> gamePlayAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE); builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter, kafkaStreamConfig.getGamePlayTopic()); Duration activityInterval = kafkaStreamConfig.getActivityInterval(); if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 % activityInterval.toMinutes() != 0) { throw new SystemFaultException( "The game activity interval must be a multiple of 5 minutes and divide into 24 hours current value [" + activityInterval.toMinutes() + "]"); } builder.addProcessor("PROCESS", new GameActivitySupplier(kafkaStreamConfig.getStoreName(), kafkaStreamConfig.getGameActivitySendPeriod(), activityInterval, kafkaStreamConfig.getRemoveOldestTime(), kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE"); SerializerAdapter<StoreValue> storeValueAdapter = new SerializerAdapter<>(JDKBinarySerializer.INSTANCE); builder.addStateStore( Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter, commonKeyAdapter).withValues( storeValueAdapter, storeValueAdapter).inMemory().build(), "PROCESS"); builder.addSink("SINK", kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter, new SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE), "PROCESS"); -- This message was sent by Atlassian JIRA (v6.3.4#6332)