Lourens Naudé created BEAM-10865:
------------------------------------
Summary: Support for Kafka deserialization API with headers (since
Kafka API 2.1.0)
Key: BEAM-10865
URL: https://issues.apache.org/jira/browse/BEAM-10865
Project: Beam
Issue Type: Improvement
Components: io-java-kafka
Reporter: Lourens Naudé
References mailing list posts:
* Original
[https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
* Reply from Luke
[https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
*Design decisions*
The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency,
compilation fails with:
```
required: String,byte[]
found: String,Headers,byte[]
reason: actual and formal argument lists differ in length
where T is a type-variable:
```
Because the headers default API only landed in 2.1.0 via
[https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue`
as API to ensure forward looking consistency for both `KafkaUnboundedReader`
and `ReadFromKafkaDoFn` as both already depended on an instance thereof.
#
##
### Not so great things
Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the
deserialization path into a more expensive indirection by calling the
deserializer methods using reflection (2x per record, 1 x key, 1 x value):
```
at
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
<<<<<<<<<<<
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
Method) <<<<<<<<<<<
at
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
<<<<<<<<<<<
at
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
<<<<<<<<<<<
at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
at
org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
at
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
at
org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
at
org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
at
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at
org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
at
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
at
org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
at
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
```
And effectively this penalises the more recent Kafka API versions in favor of
the older ones. I have not measured the overhead thereof, yet.
*Other avenues explored*
For runtime deserialization:
* Naively tried conditional compile options but the compiler cannot know which
kafka-clients version could be used at runtime
For regression tests (that we don't stop passing headers in the future):
* I tried Mockito and Powermock implementations on both
`LocalDeserializerProvider` and the Integer and Long serializers in tests, but
found the stack to be too deep and backed out of that.
* Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be
called twice as much for the newer API), but again deep stack and hard to
assert. Just the call is interesting because the constructor used for
`ConsumerRecord` in the tests does not use the one that sets headers,
presumably for client API compatibility too.
* Evaluated `ExtendedSerializer`´s
[wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]),
but `ExtendedSerializer` is deprecated API and no point in bringing that in as
a dependency
*How the current regression test works*
I figured it makes sense given this feature tests deserialization and the whole
test suite depends on the `Integer` (for keys) and Long (for values) ones to
implement a key and value deserializer that can assert the behaviour. And
herein also lies somewhat of a problem because the test case is a bit weak as I
relied on stack frames (wide array of suppored client versions makes anything
else super complex) to infer the caller of the `deserialize` method, but
unfortunately only class and method name context is provide and no arguments
size of 3 or even types on those to assert on.
Kafka client API 1.0.0 :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
```
For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning it
was called directly and not via a default or actual implementation on
`Deserializer`. Frames 1 and 2 being equal is because of the
`super.deserialize` call.
Kafka client API 2.1.0+ :
```
Frame 0: java.lang.Thread.getStackTrace
Frame 1:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 2:
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
```
For clients 2.1.0 and beyond, frame 3 is
`org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true
for the bundled deserializers used in the tests because they delegate the call
to the implementation on `Deserializer`. In practice this may refer to an
actual override implementation.
Feedback items and questions
* Any alternatives for the SpEL evaluation for this hot path API?
`consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not
called as often as twice per record.
* Ideas for a better way to test for regressions?
* Would it make sense to consider raising the minimum supported client API in
order to
* If this implementation (and very likely iterations thereof :)), would
support for the same API on serialization be appreciated as well?
Thanks for any consideration!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)