[ 
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16966364#comment-16966364
 ] 

Thomas Weise commented on BEAM-7870:
------------------------------------

Thanks for the update. I would also prefer the specific record type 
(PubsubMessage/KafkaRecord) over generic Row.

> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
>                 Key: BEAM-7870
>                 URL: https://issues.apache.org/jira/browse/BEAM-7870
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-java-core
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>
> There are limitations for the consumer to work correctly. The biggest issue 
> is the structure of KafkaIO itself, which uses a combination of the source 
> interface and DoFns to generate the desired output. The problem is that the 
> source interface is natively translated by the Flink Runner to support 
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to 
> be that the initial read does not immediately drop the KafakRecord structure 
> which does not work together well with our current assumption of only 
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn 
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn 
> won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
> the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper 
> fix but adds KafkaRecordCoder to the SDK such that it can be used 
> encode/decode records: 
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also 
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to