[
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425110#comment-17425110
]
Beam JIRA Bot commented on BEAM-7870:
-------------------------------------
This issue was marked "stale-P2" and has not received a public comment in 14
days. It is now automatically moved to P3. If you are still affected by it, you
can comment and move it back to P2.
> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
> Key: BEAM-7870
> URL: https://issues.apache.org/jira/browse/BEAM-7870
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, sdk-java-core
> Reporter: Maximilian Michels
> Priority: P3
> Labels: Clarified
>
> There are limitations for the consumer to work correctly. The biggest issue
> is the structure of KafkaIO itself, which uses a combination of the source
> interface and DoFns to generate the desired output. The problem is that the
> source interface is natively translated by the Flink Runner to support
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to
> be that the initial read does not immediately drop the KafakRecord structure
> which does not work together well with our current assumption of only
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn
> won't have the coder available in its environment.
> There are several possible solutions:
> 1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in
> the Flink Runner
> 2. Modify KafkaIO to immediately drop the KafkaRecord structure
> 3. Add the KafkaRecordCoder to all SDKs
> 4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper
> fix but adds KafkaRecordCoder to the SDK such that it can be used
> encode/decode records:
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>
> See also
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)