[ https://issues.apache.org/jira/browse/KAFKA-5398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16049358#comment-16049358 ]
Matthias J. Sax commented on KAFKA-5398: ---------------------------------------- I guess, that is expected behavior. I would claim, that's it the users responsibility to use the same key serdes for both streams for this case... The only thing we could do better, is to make sure that there is only one key serde in the first place, i.e., make it impossible to specify two serdes to guard the user of using the API in a wrong way. > Joins on GlobalKTable don't work properly when combined with Avro and the > Confluent Schema Registry > --------------------------------------------------------------------------------------------------- > > Key: KAFKA-5398 > URL: https://issues.apache.org/jira/browse/KAFKA-5398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.10.2.0, 0.10.2.1 > Environment: Kafka, Avro, Confluent Schema Registry (3.2.1) > Reporter: Benjamin Bargeton > > Joins between a {{KStream}} and {{GlobalKTable}} is not working as expected > when using the following setup: > * Use Kafka in combination with the Confluent Schema Registry > * Feed a topic ({{my-global-topic}}) that will be use as a {{GlobalKTable}} > input by posting some messages with an Avro {{GenericRecord}} as the key > (using a traditional {{Producer/ProducerRecord}} for example). > The dumb avro schema for the exemple: > {code:javascript} > { > "type": "record", > "name": "AvroKey", > "namespace": "com.test.key", > "fields": [ > { > "name": "anyfield", > "type": "string" > } > ] > } > {code} > * Start a kafka stream process that process messages using this time an Avro > {{SpecificRecord}} (AvroKey) generated by the Avro compiler for the same > schema > {code:java} > KStream<AnyKey, AnyObject> stream = builder.stream("my-stream-topic"); > GlobalKTable<AvroKey, AnyObject> globalTable = > builder.globalTable("my-global-topic", "my-global-topic-store"); > stream > .leftJoin(globalTable, (k, v) -> new > AvroKey(v.getKeyOfTheGlobalTable()), (v1, v2) -> /*the ValueJoiner*/) > .print("Result"); > {code} > Note that the schema generated by Avro for the {{SpecificRecord}} slightly > differs from the original one because we use String instead of CharSequence > (Avro config): > {code:javascript} > { > "type": "record", > "name": "AvroKey", > "namespace": "com.test.key", > "fields": [ > { > "name": "anyfield", > "type": { > "type": "string", > "avro.java.string": "String" > } > } > ] > } > {code} > * Last but not least, the Confluent Schema Registry will use byte 1-4 of the > Avro serialized object to put the schema id of the schema stored in the > schema registry. > http://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format > Now our issue is that when the {{RocksDBStore}} of the {{GlobalKTable}} will > be initilized, it will use the {{byte[]}} straight from the key. > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L179 > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L164 > Schemas for producer and stream app differs slightly (but are compatible), so > they are registred with a different global id. > Since the id is contained in the binary representation, the lookup will fail > during the join. > I didn't test but the issue is probably broader than just this case: if the > we have an upstream producer that is doing a schema evolution (with backwards > compatible change), it should lead to the same issue. > Please note that when using a {{KTable}} instead of {{GlobalKTable}} it works > fine, because the key is first deserialized and then reserialized using the > current serdes: > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java#L197 > https://github.com/apache/kafka/blob/0.10.2.1/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java#L198 > To conclude I'm not sure to fully understand yet how all pieces connect > together for state stores, but I assume that for a {{GlobalKTable}} there > should also be a derserialization/reserialization for each key before storing > them in RocksDB (at least to make {{KTable}} and {{GlobalKTable}} beahvior > coherent). -- This message was sent by Atlassian JIRA (v6.4.14#64029)