Tomasz Bradło created KAFKA-10322: ------------------------------------- Summary: InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic) Key: KAFKA-10322 URL: https://issues.apache.org/jira/browse/KAFKA-10322 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Environment: windows/linux Reporter: Tomasz Bradło
I have regular groupBy&Counting stream configuration: {code:java} fun addStream(kStreamBuilder: StreamsBuilder) { val storeSupplier = Stores.inMemoryWindowStore("count-store", Duration.ofDays(10), Duration.ofDays(1), false) val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = Stores .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long()) kStreamBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} .groupByKey() .windowedBy(TimeWindows.of(Duration.ofDays(1))) .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long())) .toStream() .to("topic1-count") val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long()) kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier) }{code} While sending to "topic1-count", for serializing the key [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] is used which is using [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] so the message key format is: {code:java} real_grouping_key + timestamp(8bytes){code} Everything works. I can get correct values from state-store. But, in recovery scenario, when [GlobalStateManagerImpl |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters offset < highWatermark loop then [InMemoryWindowStore stateRestoreCallback |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads from "topic1-count" and fails to extract valid key and timestamp using [WindowKeySchema.extractStoreKeyBytes |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and [WindowKeySchema.extractStoreTimestamp. |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It fails because it expects format: {code:java} real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)