Maximilian Michels created BEAM-7870:
----------------------------------------

             Summary: Externally configured KafkaIO consumer causes coder 
problems
                 Key: BEAM-7870
                 URL: https://issues.apache.org/jira/browse/BEAM-7870
             Project: Beam
          Issue Type: Bug
          Components: runner-flink, sdk-java-core
            Reporter: Maximilian Michels
            Assignee: Maximilian Michels


There are limitations for the consumer to work correctly. The biggest issue is 
the structure of KafkaIO itself, which uses a combination of the source 
interface and DoFns to generate the desired output. The problem is that the 
source interface is natively translated by the Flink Runner to support 
unbounded sources in portability, while the DoFn runs in a Java environment.

To transfer data between the two a coder needs to be involved. It happens to be 
that the initial read does not immediately drop the KafakRecord structure which 
does not work together well with our current assumption of only supporting 
"standard coders" present in all SDKs. Only the subsequent DoFn converts the 
KafkaRecord structure into a raw KV[byte, byte], but the DoFn won't have the 
coder available in its environment.

There are several possible solutions:
 1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
the Flink Runner
 2. Modify KafkaIO to immediately drop the KafkaRecord structure
 3. Add the KafkaRecordCoder to all SDKs
 4. Add a generic coder, e.g. AvroCoder to all SDKs

For a workaround which uses (3), please see this patch which is not a proper 
fix but adds KafkaRecordCoder to the SDK such that it can be used encode/decode 
records: 
[https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]

 

See also 
[https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

Reply via email to