[
https://issues.apache.org/jira/browse/BEAM-10865?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Luke Cwik updated BEAM-10865:
-----------------------------
Status: Open (was: Triage Needed)
> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
> Key: BEAM-10865
> URL: https://issues.apache.org/jira/browse/BEAM-10865
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Lourens Naudé
> Assignee: Lourens Naudé
> Priority: P2
> Labels: KafkaIO, kafka
> Original Estimate: 24h
> Time Spent: 20m
> Remaining Estimate: 23h 40m
>
> References mailing list posts:
> * Original
> [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
> * Reply from Luke
> [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency,
> compilation fails with:
> ```
> required: String,byte[]
> found: String,Headers,byte[]
> reason: actual and formal argument lists differ in length
> where T is a type-variable:
> ```
> Because the headers default API only landed in 2.1.0 via
> [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue`
> as API to ensure forward looking consistency for both `KafkaUnboundedReader`
> and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the
> deserialization path into a more expensive indirection by calling the
> deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
> at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
> <<<<<<<<<<<
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) <<<<<<<<<<<
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> <<<<<<<<<<<
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> <<<<<<<<<<<
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
> at
> org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
> at
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
> at
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
> ```
> And effectively this penalises the more recent Kafka API versions in favor of
> the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
> * Naively tried conditional compile options but the compiler cannot know
> which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
> * I tried Mockito and Powermock implementations on both
> `LocalDeserializerProvider` and the Integer and Long serializers in tests,
> but found the stack to be too deep and backed out of that.
> * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be
> called twice as much for the newer API), but again deep stack and hard to
> assert. Just the call is interesting because the constructor used for
> `ConsumerRecord` in the tests does not use the one that sets headers,
> presumably for client API compatibility too.
> * Evaluated `ExtendedSerializer`´s
> [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]),
> but `ExtendedSerializer` is deprecated API and no point in bringing that in
> as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the
> whole test suite depends on the `Integer` (for keys) and Long (for values)
> ones to implement a key and value deserializer that can assert the behaviour.
> And herein also lies somewhat of a problem because the test case is a bit
> weak as I relied on stack frames (wide array of suppored client versions
> makes anything else super complex) to infer the caller of the `deserialize`
> method, but unfortunately only class and method name context is provide and
> no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
> ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning
> it was called directly and not via a default or actual implementation on
> `Deserializer`. Frames 1 and 2 being equal is because of the
> `super.deserialize` call.
>
> Kafka client API 2.1.0+ :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
> ```
> For clients 2.1.0 and beyond, frame 3 is
> `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is
> true for the bundled deserializers used in the tests because they delegate
> the call to the implementation on `Deserializer`. In practice this may refer
> to an actual override implementation.
> Feedback items and questions
> * Any alternatives for the SpEL evaluation for this hot path API?
> `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and
> not called as often as twice per record.
> * Ideas for a better way to test for regressions?
> * Would it make sense to consider raising the minimum supported client API
> in order to
> * If this implementation (and very likely iterations thereof :)), would
> support for the same API on serialization be appreciated as well?
> Thanks for any consideration!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)