[
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jonathan Hourany updated BEAM-7870:
-----------------------------------
Comment: was deleted
(was: I'm still having this issue in Beam 2.32.0, [as are
others|https://stackoverflow.com/questions/69010506/python-apache-beam-sdk-readfromkafka-cant-consume-the-data-error]
and [~CannonFodder]'s work around isn't working for me. Running a small
pipeline with DirectRunner results in the following error:
{code:java}
WARNING:root:Make sure that locally built Python SDK docker image has Python
3.7 interpreter.
2.32.0: Pulling from apache/beam_java11_sdk
Digest: sha256:a45f89584071950d371966abf910869c456179ab54c7b5213e3f4e2a54bd2753
Status: Image is up to date for apache/beam_java11_sdk:2.32.0
docker.io/apache/beam_java11_sdk:2.32.0
ERROR:root:severity: ERROR
timestamp {
seconds: 1632257698
nanos: 950000000
}
message: "Client failed to deque and process the value"
trace: "org.apache.beam.sdk.util.UserCodeException:
java.lang.IllegalArgumentException: Unable to encode element
\'org.apache.beam.sdk.io.kafka.KafkaRecord@962f7b6b\' with coder
\'KafkaRecordCoder(ByteArrayCoder,ByteArrayCoder)
{code})
> Externally configured KafkaIO / PubsubIO consumer causes coder problems
> -----------------------------------------------------------------------
>
> Key: BEAM-7870
> URL: https://issues.apache.org/jira/browse/BEAM-7870
> Project: Beam
> Issue Type: Bug
> Components: runner-flink, sdk-java-core
> Reporter: Maximilian Michels
> Priority: P2
> Labels: Clarified
>
> There are limitations for the consumer to work correctly. The biggest issue
> is the structure of KafkaIO itself, which uses a combination of the source
> interface and DoFns to generate the desired output. The problem is that the
> source interface is natively translated by the Flink Runner to support
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to
> be that the initial read does not immediately drop the KafakRecord structure
> which does not work together well with our current assumption of only
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn
> won't have the coder available in its environment.
> There are several possible solutions:
> 1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in
> the Flink Runner
> 2. Modify KafkaIO to immediately drop the KafkaRecord structure
> 3. Add the KafkaRecordCoder to all SDKs
> 4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper
> fix but adds KafkaRecordCoder to the SDK such that it can be used
> encode/decode records:
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>
> See also
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)