[ 
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911235#comment-16911235
 ] 

Maximilian Michels commented on BEAM-7870:
------------------------------------------

Should also mention that we can fix this without any change to KafkaIO by 
simply properly setting up the coders between the parts of KafkaIO which 
involve the Runner (legacy sourceAPI part) and the subsequent DoFns (e.g. to 
drop the KafkaRecord structure). Since all of this is Java based, the correct 
coders are available. 

> Externally configured KafkaIO consumer causes coder problems
> ------------------------------------------------------------
>
>                 Key: BEAM-7870
>                 URL: https://issues.apache.org/jira/browse/BEAM-7870
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-java-core
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>
> There are limitations for the consumer to work correctly. The biggest issue 
> is the structure of KafkaIO itself, which uses a combination of the source 
> interface and DoFns to generate the desired output. The problem is that the 
> source interface is natively translated by the Flink Runner to support 
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to 
> be that the initial read does not immediately drop the KafakRecord structure 
> which does not work together well with our current assumption of only 
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn 
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn 
> won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
> the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper 
> fix but adds KafkaRecordCoder to the SDK such that it can be used 
> encode/decode records: 
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also 
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to