[
https://issues.apache.org/jira/browse/BEAM-10865?focusedWorklogId=486601&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-486601
]
ASF GitHub Bot logged work on BEAM-10865:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 20/Sep/20 01:56
Start Date: 20/Sep/20 01:56
Worklog Time Spent: 10m
Work Description: methodmissing commented on pull request #12794:
URL: https://github.com/apache/beam/pull/12794#issuecomment-695482531
Hi Luke,
I got it working with a resolution strategy to force `kafka-clients:2.1.0`
for the `kafkaVersion210` configuration. It's not super clean and I'd prefer to
not have to use it, but I think this dependency `provided
library.java.kafka_clients` is hard to substitute otherwise.
Original Kafka test, default API:
<img width="1486" alt="Screenshot 2020-09-20 at 01 45 31"
src="https://user-images.githubusercontent.com/379/93692551-8fcca200-faec-11ea-992e-1b61cb12396f.png">
API 2.1.0 specific test:
<img width="1201" alt="Screenshot 2020-09-20 at 01 46 00"
src="https://user-images.githubusercontent.com/379/93692562-9bb86400-faec-11ea-96d8-6e27bff4da58.png">
I reverse tested with the following corruption of the 2.1.0 path in the
`ConsumerSpEL` to confirm API 2.1.0 was loaded at runtime:
```diff
diff --git
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
index 9ca5bfc990..37a4a2bc8e 100644
---
a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
+++
b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ConsumerSpEL.java
@@ -132,7 +132,7 @@ class ConsumerSpEL {
mapContext.setVariable("deserializer", deserializer);
mapContext.setVariable("topic", rawRecord.topic());
mapContext.setVariable("headers", rawRecord.headers());
- mapContext.setVariable("data", isKey ? rawRecord.key() :
rawRecord.value());
+ mapContext.setVariable("data", isKey ? rawRecord.value() :
rawRecord.key());
return deserializeWithHeadersExpression.getValue(mapContext);
}
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 486601)
Remaining Estimate: 22h (was: 22h 10m)
Time Spent: 2h (was: 1h 50m)
> Support for Kafka deserialization API with headers (since Kafka API 2.1.0)
> --------------------------------------------------------------------------
>
> Key: BEAM-10865
> URL: https://issues.apache.org/jira/browse/BEAM-10865
> Project: Beam
> Issue Type: Improvement
> Components: io-java-kafka
> Reporter: Lourens Naudé
> Assignee: Lourens Naudé
> Priority: P2
> Labels: KafkaIO, kafka
> Original Estimate: 24h
> Time Spent: 2h
> Remaining Estimate: 22h
>
> References mailing list posts:
> * Original
> [https://lists.apache.org/thread.html/rdac09a286cab86c237cf17ed35cdc592b4079d116fc682a6f797d68b%40%3Cdev.beam.apache.org%3E]
> * Reply from Luke
> [https://lists.apache.org/thread.html/rfde58381e9c34da7894b2dd5325c02944411539235f2668adea5bf24%40%3Cdev.beam.apache.org%3E]
> *Design decisions*
> The reason for SpEL is because with kafka-clients API < 2.1.0 as dependency,
> compilation fails with:
> ```
> required: String,byte[]
> found: String,Headers,byte[]
> reason: actual and formal argument lists differ in length
> where T is a type-variable:
> ```
> Because the headers default API only landed in 2.1.0 via
> [https://github.com/apache/kafka/commit/f1f719211e5f28fe5163e65dba899b1da796a8e0#diff-a4f4aee88ce5091db576139f6c610ced]
> I opted for `ConsumerSpEL#deserializeKey` and `ConsumerSpEL#deserializeValue`
> as API to ensure forward looking consistency for both `KafkaUnboundedReader`
> and `ReadFromKafkaDoFn` as both already depended on an instance thereof
> *Not so great things*
> Using the SpEL for kafka-client API 2.1.0 onwards effectively turns the
> deserialization path into a more expensive indirection by calling the
> deserializer methods using reflection (2x per record, 1 x key, 1 x value):
> ```
> at
> org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:58)
> <<<<<<<<<<<
> at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native
> Method) <<<<<<<<<<<
> at
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> <<<<<<<<<<<
> at
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> <<<<<<<<<<<
> at java.base/java.lang.reflect.Method.invoke(Method.java:566) <<<<<<<<<<<
> at
> org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:117)
> at
> org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:134)
> at
> org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:52)
> at
> org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:377)
> at
> org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
> at
> org.springframework.expression.spel.ast.SpelNodeImpl.getValue(SpelNodeImpl.java:121)
> at
> org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:262)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.evaluateDeserializeWithHeaders(ConsumerSpEL.java:134)
> at
> org.apache.beam.sdk.io.kafka.ConsumerSpEL.deserializeValue(ConsumerSpEL.java:174)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.advance(KafkaUnboundedReader.java:195)
> at
> org.apache.beam.sdk.io.kafka.KafkaUnboundedReader.start(KafkaUnboundedReader.java:137)
> ```
> And effectively this penalises the more recent Kafka API versions in favor of
> the older ones. I have not measured the overhead thereof, yet.
> *Other avenues explored*
> For runtime deserialization:
> * Naively tried conditional compile options but the compiler cannot know
> which kafka-clients version could be used at runtime
> For regression tests (that we don't stop passing headers in the future):
> * I tried Mockito and Powermock implementations on both
> `LocalDeserializerProvider` and the Integer and Long serializers in tests,
> but found the stack to be too deep and backed out of that.
> * Ditto for attempting to spy on `ConsumerRecord#headers()` (expect it to be
> called twice as much for the newer API), but again deep stack and hard to
> assert. Just the call is interesting because the constructor used for
> `ConsumerRecord` in the tests does not use the one that sets headers,
> presumably for client API compatibility too.
> * Evaluated `ExtendedSerializer`´s
> [wrapper]([https://kafka.apache.org/0110/javadoc/org/apache/kafka/common/serialization/ExtendedDeserializer.Wrapper.html]),
> but `ExtendedSerializer` is deprecated API and no point in bringing that in
> as a dependency
> *How the current regression test works*
> I figured it makes sense given this feature tests deserialization and the
> whole test suite depends on the `Integer` (for keys) and Long (for values)
> ones to implement a key and value deserializer that can assert the behaviour.
> And herein also lies somewhat of a problem because the test case is a bit
> weak as I relied on stack frames (wide array of suppored client versions
> makes anything else super complex) to infer the caller of the `deserialize`
> method, but unfortunately only class and method name context is provide and
> no arguments size of 3 or even types on those to assert on.
> Kafka client API 1.0.0 :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.beam.sdk.io.kafka.ConsumerSpEL#deserializeKey
> ```
> For clients before 2.1.0, frame 3 is `ConsumerSpEL#deserializeKey`, meaning
> it was called directly and not via a default or actual implementation on
> `Deserializer`. Frames 1 and 2 being equal is because of the
> `super.deserialize` call.
>
> Kafka client API 2.1.0+ :
> ```
> Frame 0: java.lang.Thread.getStackTrace
> Frame 1:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 2:
> org.apache.beam.sdk.io.kafka.KafkaIOTest$IntegerDeserializerWithHeadersAssertor#deserialize
> Frame 3: org.apache.kafka.common.serialization.Deserializer#deserialize
> ```
> For clients 2.1.0 and beyond, frame 3 is
> `org.apache.kafka.common.serialization.Deserializer#deserialize`. This is
> true for the bundled deserializers used in the tests because they delegate
> the call to the implementation on `Deserializer`. In practice this may refer
> to an actual override implementation.
> Feedback items and questions
> * Any alternatives for the SpEL evaluation for this hot path API?
> `consumer.seekToEnd` and `consumer.assign` are once off / periodic APIs and
> not called as often as twice per record.
> * Ideas for a better way to test for regressions?
> * Would it make sense to consider raising the minimum supported client API
> in order to
> * If this implementation (and very likely iterations thereof :)), would
> support for the same API on serialization be appreciated as well?
> Thanks for any consideration!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)