[
https://issues.apache.org/jira/browse/KAFKA-3207?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Guozhang Wang reassigned KAFKA-3207:
------------------------------------
Assignee: Guozhang Wang
> StateStore seems to be writing state to one topic but restoring from another
> ----------------------------------------------------------------------------
>
> Key: KAFKA-3207
> URL: https://issues.apache.org/jira/browse/KAFKA-3207
> Project: Kafka
> Issue Type: Bug
> Components: kafka streams
> Affects Versions: 0.9.1.0
> Environment: MacOS El Capitan
> Reporter: Tom Dearman
> Assignee: Guozhang Wang
> Priority: Blocker
>
> The state store (I am using in-memory state store) writes to a topic call
> [store-name] but restores from [job-id]-[store-name]-changelog. You can see
> in StoreChangeLogger that it writes to a topic which is the [store-name]
> passed through from the store supplier factory, but restores from the above
> topic name. My topology is:
> TopologyBuilder builder = new TopologyBuilder();
> SerializerAdapter<CommonKey> commonKeyAdapter = new
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
> SerializerAdapter<GamePlayValue> gamePlayAdapter = new
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
> builder.addSource("SOURCE", commonKeyAdapter, gamePlayAdapter,
> kafkaStreamConfig.getGamePlayTopic());
> Duration activityInterval =
> kafkaStreamConfig.getActivityInterval();
> if (activityInterval.toMinutes() % 5 != 0 || 24 * 60 %
> activityInterval.toMinutes() != 0)
> {
> throw new SystemFaultException(
> "The game activity interval must be a multiple
> of 5 minutes and divide into 24 hours current value [" +
> activityInterval.toMinutes() + "]");
> }
> builder.addProcessor("PROCESS", new
> GameActivitySupplier(kafkaStreamConfig.getStoreName(),
>
> kafkaStreamConfig.getGameActivitySendPeriod(),
>
> activityInterval,
>
> kafkaStreamConfig.getRemoveOldestTime(),
>
> kafkaStreamConfig.getRemoveAbsoluteTime()), "SOURCE");
> SerializerAdapter<StoreValue> storeValueAdapter = new
> SerializerAdapter<>(JDKBinarySerializer.INSTANCE);
> builder.addStateStore(
>
> Stores.create(kafkaStreamConfig.getStoreName()).withKeys(commonKeyAdapter,
> commonKeyAdapter).withValues(
> storeValueAdapter,
> storeValueAdapter).inMemory().build(), "PROCESS");
> builder.addSink("SINK",
> kafkaStreamConfig.getGameActivityTopic(), commonKeyAdapter,
> new
> SerializerAdapter<GameActivityTotalMessage>(JDKBinarySerializer.INSTANCE),
> "PROCESS");
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)