Tomasz Bradło created KAFKA-10322:
-------------------------------------

             Summary: InMemoryWindowStore restore keys format incompatibility 
(lack of sequenceNumber in keys on topic)
                 Key: KAFKA-10322
                 URL: https://issues.apache.org/jira/browse/KAFKA-10322
             Project: Kafka
          Issue Type: Bug
          Components: streams
    Affects Versions: 2.5.0
         Environment: windows/linux
            Reporter: Tomasz Bradło


I have regular groupBy&Counting stream configuration:
{code:java}
   
    fun addStream(kStreamBuilder: StreamsBuilder) {
        val storeSupplier = Stores.inMemoryWindowStore("count-store",
                Duration.ofDays(10),
                Duration.ofDays(1),
                false)
        val storeBuilder: StoreBuilder<WindowStore<CountableEvent, Long>> = 
Stores
                .windowStoreBuilder(storeSupplier, 
JsonSerde(CountableEvent::class.java), Serdes.Long())

        kStreamBuilder
                .stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
                .map {_, jsonRepresentation -> 
KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofDays(1)))
                .count(Materialized.with(JsonSerde(CountableEvent::class.java), 
Serdes.Long()))
                .toStream()
                .to("topic1-count")

        val storeConsumed = 
Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
 Duration.ofDays(1).toMillis()), Serdes.Long())
        kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
storeConsumed, passThroughProcessorSupplier)
    }{code}
While sending to "topic1-count", for serializing the key 
[TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
 is used which is using 
[WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
 so the message key format is:
{code:java}
real_grouping_key + timestamp(8bytes){code}
 

Everything works. I can get correct values from state-store. But, in recovery 
scenario, when [GlobalStateManagerImpl 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
 offset < highWatermark loop then

[InMemoryWindowStore stateRestoreCallback 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
 from "topic1-count" and fails to extract valid key and timestamp using 
[WindowKeySchema.extractStoreKeyBytes 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
 [WindowKeySchema.extractStoreTimestamp. 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
 fails because it expects format:
{code:java}
real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to