methodmissing opened a new pull request #12794:
URL: https://github.com/apache/beam/pull/12794


   TLDR  Let KafkaIO support the deserializer API with headers
   
     References mailing list posts:
   
     * Original 
https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E
     * Reply from Luke 
https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E
   
   ### Design decisions
   
   The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency, 
compilation fails with:
   
   ```
     required: String,byte[]
     found: String,Headers,byte[]
     reason: actual and formal argument lists differ in length
     where T is a type-variable:
   ```
   
   Because the headers default API only landed in 2.1.0 via 
https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced
   
   I opted for `ConsumerSpEL#deserializeKey` and 
`ConsumerSpEL#deserializeValue` as API to ensure forward looking consistency 
for both `KafkaUnboundedReader` and `ReadFromKafkaDoFn` as both already 
depended on an instance thereof.
   
   ### Not so great things
   
   Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the 
deserialization path into a more expensive indirection by calling the 
deserializer methods using reflection (2x per record, 1 x key, 1 x value):
   
   ```
        at 
org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
 <<<<<<<<<<<
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
<<<<<<<<<<<
        at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 <<<<<<<<<<<
        at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 <<<<<<<<<<<
        at java.base/java.lang.reflect.Method.invoke(Method.java:566) 
<<<<<<<<<<<
        at 
org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
        at 
org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
        at 
org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
        at 
org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
        at 
org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
        at 
org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
        at 
org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
        at 
org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
        at 
org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
        at 
org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
   ```
   
   And effectively this penalizes the more recent Kafka API versions in favor 
of the older ones. I have not measured the overhead thereof, yet.
   
   ### Other avenues explored
   
   For runtime deserialization:
   
   * Naively tried conditional compile options but the compiler cannot know 
which kafka-clients version could be used at runtime
   
   For regression tests (that we don't stop passing headers in the future):
   
   * I tried Mockito and Powermock implementations on both 
`LocalDeserializerProvider` and the Integer and Long serializers in tests, but 
found the stack to be too deep and backed out of that.
   
   * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be 
called twice as much for the newer API), but again deep stack and hard to 
assert. Just the call is interesting because the constructor used for 
`ConsumerRecord` in the tests does not use the one that sets headers, 
presumably for client API compatibility too.
   
   * Evaluated `ExtendedSerializer`´s 
[wrapper](https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html),
 but `ExtendedSerializer` is deprecated API and no point in bringing that in as 
a dependency
   
   ### How the current regression test works
   
   I figured it makes sense given this feature tests deserialization and the 
whole test suite depends on the `Integer` (for keys) and Long (for values) ones 
to implement a key and value deserializer that can assert the behaviour. And 
herein also lies somewhat of a problem because the test case is a bit weak as I 
relied on stack frames (wide array of suppored client versions makes anything 
else super complex) to infer the caller of the `deserialize` method, but 
unfortunately only class and method name context is provide and no arguments 
size of 3 or even types on those to assert on.
   
   Kafka client API 1.0.0 :
   
   ```
   Frame 0: java.lang.Thread.getStackTrace
   Frame 1: 
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 2: 
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
   ```
   
   For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning 
it was called directly and not via a default or actual implementation on 
`Deserializer`. Frames 1 and 2 being equal is because of the 
`super.deserialize` call.
   
   Kafka client API 2.1.0+ :
   ```
   Frame 0: java.lang.Thread.getStackTrace
   Frame 1: 
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 2: 
org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
   Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
   ```
   
   For clients 2.1.0 and beyond, frame 3 is 
`org.apache.kafka.common.serialization.Deserializer#deserialize`. This is true 
for the bundled deserializers used in the tests because they delegate the call 
to the implementation on `Deserializer`. In practice this may refer to an 
actual override implementation.
   
   ### Feedback items for me
   
   * Any alternatives for the SpEL evaluation for this hot path API? 
`consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and not 
called as often as twice per record.
   * Ideas for a better way to test for regressions?
   
   ### Questions
   
   * Would it make sense to consider raising the minimum supported client API 
in order to 
   * If this implementation (and very likely iterations thereof :-)), would 
support for the same API on serialization be appreciated as well?
   
   
   ------------------------
   
   Thank you for your contribution! Follow this checklist to help us 
incorporate your contribution quickly and easily:
   
    - [ ] [**Choose 
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and 
mention them in a comment (`R: @username`).
    - [ ] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue, if applicable. This will automatically link the pull request to the 
issue.
    - [ ] Update `CHANGES.md` with noteworthy changes.
    - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
   
   See the [Contributor Guide](https://beam.apache.org/contribute) for more 
tips on [how to make review process 
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
   
   Post-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   Lang | SDK | Dataflow | Flink | Samza | Spark | Twister2
   --- | --- | --- | --- | --- | --- | ---
   Go | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Go_VR_Spark/lastCompletedBuild/)
 | ---
   Java | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Dataflow_Java11/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/badge/i
 
con)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Flink_Java11/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Flink_Streaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Samza/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Spark/lastCompletedBuild/)<br>[![Build
 Status](htt
 
ps://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_PVR_Spark_Batch/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_SparkStructuredStreaming/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Java_ValidatesRunner_Twister2/lastCompletedBuild/)
   Python | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python36/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python37/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python38/lastCompletedBuild/)
 | [![Build Status](https://ci-beam.apache.org/job/beam_PostCommit_
 
Py_VR_Dataflow/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_VR_Dataflow_V2/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Py_ValCont/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python35_VR_Flink/lastCompletedBuild/)
 | --- | [![Build Status](https://ci-beam.apache.org/job/beam_P
 
ostCommit_Python_VR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_Python_VR_Spark/lastCompletedBuild/)
 | ---
   XLang | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Direct/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Flink/lastCompletedBuild/)
 | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PostCommit_XVR_Spark/lastCompletedBuild/)
 | ---
   
   Pre-Commit Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   
   --- |Java | Python | Go | Website | Whitespace | Typescript
   --- | --- | --- | --- | --- | --- | ---
   Non-portable | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Java_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Python_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonLint_Cron/lastCompletedBuild/)<br>[![Build
 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocker_Cron/lastCompletedBuild/)
 <br>[![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_PythonDocs_Cron/lastCompletedBuild/)
 | [![Build Status](https://ci-beam.apache.org/job/be
 
am_PreCommit_Go_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Go_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Website_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Whitespace_Cron/lastCompletedBuild/)
 | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Typescript_Cron/lastCompletedBuild/)
   Portable | --- | [![Build 
Status](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/badge/icon)](https://ci-beam.apache.org/job/beam_PreCommit_Portable_Python_Cron/lastCompletedBuild/)
 | --- | --- | --- | ---
   
   See 
[.test-infra/jenkins/README](https://github.com/apache/beam/blob/master/.test-infra/jenkins/README.md)
 for trigger phrase, status and link of all Jenkins jobs.
   
   
   GitHub Actions Tests Status (on master branch)
   
------------------------------------------------------------------------------------------------
   [![Build python source distribution and 
wheels](https://github.com/apache/beam/workflows/Build%20python%20source%20distribution%20and%20wheels/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
   [![Python 
tests](https://github.com/apache/beam/workflows/Python%20tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
   [![Java 
tests](https://github.com/apache/beam/workflows/Java%20Tests/badge.svg?branch=master&event=schedule)](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
   
   See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more 
information about GitHub Actions CI.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to