[
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377162#comment-17377162
]
CannonFodder edited comment on BEAM-7870 at 7/8/21, 7:18 AM:
-------------------------------------------------------------
I encountered this problem too.
Environment:
CentOS 7
anaconda
python 3.8.8
java 1.8.0_292
beam 2.30.0
Stacktrace:
Caused by: java.lang.RuntimeException: Couldn't infer Coder from class
org.apache.kafka.common.serialization.StringDeserializer
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.resolveCoder(KafkaIO.java:743)
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:680)
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:623)
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:128)
my current workaround is don't specify those deserializers, use python
decode('utf-8') instead.
{{
msg_kv_bytes = ( p
| 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn'],
max_num_records =20))#, key_deserializer=string_deserializer,
value_deserializer=string_deserializer)
messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k, v:
(k.decode('utf-8'), v.decode('utf-8'))}}
was (Author: cannonfodder):
I encountered this problem too.
Environment:
CentOS 7
anaconda
python 3.8.8
java 1.8.0_292
beam 2.30.0
Stacktrace:
Caused by: java.lang.RuntimeException: Couldn't infer Coder from class
org.apache.kafka.common.serialization.StringDeserializer
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.resolveCoder(KafkaIO.java:743)
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:680)
at
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:623)
at
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:128)
my current workaround is don't specify those deserializers, use python
decode('utf-8') instead.
msg_kv_bytes = ( p
| 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn'],
max_num_records =20))#, key_deserializer=string_deserializer,
value_deserializer=string_deserializer)
messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k, v:
(k.decode('utf-8'), v.decode('utf-8'))
> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
> Key: BEAM-7870
> URL: https://issues.apache.org/jira/browse/BEAM-7870
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, sdk-java-core
> Reporter: Maximilian Michels
> Priority: P3
> Labels: Clarified
>
> There are limitations for the consumer to work correctly. The biggest issue
> is the structure of KafkaIO itself, which uses a combination of the source
> interface and DoFns to generate the desired output. The problem is that the
> source interface is natively translated by the Flink Runner to support
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to
> be that the initial read does not immediately drop the KafakRecord structure
> which does not work together well with our current assumption of only
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn
> won't have the coder available in its environment.
> There are several possible solutions:
> 1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in
> the Flink Runner
> 2. Modify KafkaIO to immediately drop the KafkaRecord structure
> 3. Add the KafkaRecordCoder to all SDKs
> 4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper
> fix but adds KafkaRecordCoder to the SDK such that it can be used
> encode/decode records:
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>
> See also
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)