[ 
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17377162#comment-17377162
 ] 

CannonFodder edited comment on BEAM-7870 at 7/8/21, 7:18 AM:
-------------------------------------------------------------

I encountered this problem too.

 

Environment:

CentOS 7

anaconda

python 3.8.8

java 1.8.0_292

beam 2.30.0

 

Stacktrace:

Caused by: java.lang.RuntimeException: Couldn't infer Coder from class 
org.apache.kafka.common.serialization.StringDeserializer
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.resolveCoder(KafkaIO.java:743)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:680)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:623)
 at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:128)

 

my current workaround is don't specify those deserializers, use python 
decode('utf-8') instead.

 
{{
msg_kv_bytes = ( p
 | 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn'], 
max_num_records =20))#, key_deserializer=string_deserializer, 
value_deserializer=string_deserializer)
 messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k, v: 
(k.decode('utf-8'), v.decode('utf-8'))}}


was (Author: cannonfodder):
I encountered this problem too.

 

Environment:

CentOS 7

anaconda

python 3.8.8

java 1.8.0_292

beam 2.30.0

 

Stacktrace:

Caused by: java.lang.RuntimeException: Couldn't infer Coder from class 
org.apache.kafka.common.serialization.StringDeserializer
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.resolveCoder(KafkaIO.java:743)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:680)
 at 
org.apache.beam.sdk.io.kafka.KafkaIO$Read$Builder.buildExternal(KafkaIO.java:623)
 at 
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader.lambda$knownTransforms$0(ExpansionService.java:128)

 

my current workaround is don't specify those deserializers, use python 
decode('utf-8') instead.

 

msg_kv_bytes = ( p
 | 'ReadKafka' >> ReadFromKafka(consumer_config=conf,topics=['LaneIn'], 
max_num_records =20))#, key_deserializer=string_deserializer, 
value_deserializer=string_deserializer)
 messages = msg_kv_bytes | 'Decode' >> beam.MapTuple(lambda k, v: 
(k.decode('utf-8'), v.decode('utf-8'))

> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
>                 Key: BEAM-7870
>                 URL: https://issues.apache.org/jira/browse/BEAM-7870
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-java-core
>            Reporter: Maximilian Michels
>            Priority: P3
>              Labels: Clarified
>
> There are limitations for the consumer to work correctly. The biggest issue 
> is the structure of KafkaIO itself, which uses a combination of the source 
> interface and DoFns to generate the desired output. The problem is that the 
> source interface is natively translated by the Flink Runner to support 
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to 
> be that the initial read does not immediately drop the KafakRecord structure 
> which does not work together well with our current assumption of only 
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn 
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn 
> won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
> the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper 
> fix but adds KafkaRecordCoder to the SDK such that it can be used 
> encode/decode records: 
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also 
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to