[ 
https://issues.apache.org/jira/browse/BEAM-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Cwik updated BEAM-10865:
-----------------------------
    Status: Resolved  (was: Open)

> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
>                 Key: BEAM-10865
>                 URL: https://issues.apache.org/jira/browse/BEAM-10865
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-kafka
>            Reporter: Lourens Naudé
>            Assignee: Lourens Naudé
>            Priority: P2
>              Labels: KafkaIO, kafka
>   Original Estimate: 24h
>          Time Spent: 2h 20m
>  Remaining Estimate: 21h 40m
>
> References mailing list posts:
>  * Original 
> [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
>  * Reply from Luke 
> [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, 
> compilation fails with:
> ```
>  required: String,byte[]
>  found: String,Headers,byte[]
>  reason: actual and formal argument lists differ in length
>  where T is a type-variable:
>  ```
> Because the headers default API only landed in 2.1.0 via 
> [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue` 
> as API to ensure forward looking consistency for both `KafkaUnboundedReader` 
> and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the 
> deserialization path into a more expensive indirection by calling the 
> deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
>  at 
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
>  <<<<<<<<<<<
>  at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native 
> Method) <<<<<<<<<<<
>  at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  <<<<<<<<<<<
>  at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  <<<<<<<<<<<
>  at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
>  at 
> org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
>  at 
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
>  at 
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
>  at 
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
>  at 
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
>  at 
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
>  at 
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
>  at 
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
>  at 
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
>  at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
>  at 
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
>  ```
> And effectively this penalises the more recent Kafka API versions in favor of 
> the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
>  * Naively tried conditional compile options but the compiler cannot know 
> which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
>  * I tried Mockito and Powermock implementations on both 
> `LocalDeserializerProvider` and the Integer and Long serializers in tests, 
> but found the stack to be too deep and backed out of that.
>  * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be 
> called twice as much for the newer API), but again deep stack and hard to 
> assert. Just the call is interesting because the constructor used for 
> `ConsumerRecord` in the tests does not use the one that sets headers, 
> presumably for client API compatibility too.
>  * Evaluated `ExtendedSerializer`´s 
> [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]),
>  but `ExtendedSerializer` is deprecated API and no point in bringing that in 
> as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the 
> whole test suite depends on the `Integer` (for keys) and Long (for values) 
> ones to implement a key and value deserializer that can assert the behaviour. 
> And herein also lies somewhat of a problem because the test case is a bit 
> weak as I relied on stack frames (wide array of suppored client versions 
> makes anything else super complex) to infer the caller of the `deserialize` 
> method, but unfortunately only class and method name context is provide and 
> no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: 
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: 
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
>  ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning 
> it was called directly and not via a default or actual implementation on 
> `Deserializer`. Frames 1 and 2 being equal is because of the 
> `super.deserialize` call.
>  
> Kafka client API 2.1.0+ :
>  ```
>  Frame 0: java.lang.Thread.getStackTrace
>  Frame 1: 
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 2: 
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
>  Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
>  ```
> For clients 2.1.0 and beyond, frame 3 is 
> `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is 
> true for the bundled deserializers used in the tests because they delegate 
> the call to the implementation on `Deserializer`. In practice this may refer 
> to an actual override implementation.
> Feedback items and questions
>  * Any alternatives for the SpEL evaluation for this hot path API? 
> `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and 
> not called as often as twice per record.
>  * Ideas for a better way to test for regressions?
>  * Would it make sense to consider raising the minimum supported client API 
> in order to
>  * If this implementation (and very likely iterations thereof :)), would 
> support for the same API on serialization be appreciated as well?
> Thanks for any consideration!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to