[ 
https://issues.apache.org/jira/browse/BEAM-7870?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16911560#comment-16911560
 ] 

Chad Dombrova edited comment on BEAM-7870 at 8/20/19 10:04 PM:
---------------------------------------------------------------

To follow up from the ML thread, I'm having a very similar problem when 
converting PubsubIO to an external transform.

{quote}
To transfer data between the two a coder needs to be involved. It happens to be 
that the initial read does not immediately drop the KafakRecord structure which 
does not work together well with our current assumption of only supporting 
"standard coders" present in all SDKs. Only the subsequent DoFn converts the 
KafkaRecord structure into a raw KV[byte, byte], but the DoFn won't have the 
coder available in its environment.
{quote}

PubsubIO behaves much the same way as you've described -- a Read produces a 
PubsubMessage structure which is converted to bytes (in protobufs format) by a 
subsequent DoFn -- but the problem I'm seeing is not that "the DoFn won't have 
the coder available in its environment", it's that the Read does not use the 
proper coder because FlinkStreamingPortablePipelineTranslator.translateRead 
replaces all non-standard coders with ByteArrayCoder.  So in my tests, the job 
fails with the first read message, complaining that PubsubMessage cannot be 
cast to bytes.

Some thoughts on the proposed solutions:

{quote}
1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
the Flink Runner
{quote}

I don't see how this will help PubusbIO, because as I stated I'm fairly 
confident the error happens when flink tries to store the pubsub message 
immediately after it's been read, before the DoFn is even called.  

{quote}
2. Modify KafkaIO to immediately drop the KafkaRecord structure
{quote}

This is the simplest solution if we look at it superficially, but it's severely 
limited in usefulness.  In PubsubIO, the next transform is actually a DoFn that 
wants a PubsubMessage, to update some stats metrics (see my overview of the 
pipeline below).  This solution would either prevent that from working or 
require that every transform decode and encode the payload itself, which I 
think is self-defeating.

{quote}
3. Add the KafkaRecordCoder to all SDKs
{quote}

Thanks for the patch example.  I'll see if something similar works for PubsubIO.

{quote}
4. Add a generic coder, e.g. AvroCoder to all SDKs
{quote}

What about protobufs as a compromise between options 3 and 4?  

In the case of PubsubIO and PubsubMessages, java has its own native coder 
(actually a pair, one for messages w/ attributes and one for w/out), while 
python is using protobufs.  It would be simpler to write a protobufs coder for 
Java than to write a python version of PubsubMessageWithAttributesCoder, so I 
would favor protobufs as a pattern to apply generally to this kind of 
portability problem.

The other thing I like about a protobufs-based approach is that you could write 
an abstract ProtoCoder base class which can easily be specialized by simply 
providing the native class and the proto class.

My current external pubsub test pipeline looks like this after its been 
expanded:

{noformat}
--- java ---
Read<PBegin, PubsubMessage>
ParDo<PubsubMessage, PubsubMessage>  # stats
ParDo<PubsubMessage, byte[]>  # convert to pusbub protobuf byte array
--- python --
ParDo<byte[], PubsubMessage>  # convert from pusbub protobuf byte array
ParDo<PubsubMessage, ...>  # custom logger
{noformat}

If we created and registered a protobufs-based coder for PubsubMessage, we 
could cut out the converter transforms, and it would look like this:

{noformat}
--- java ---
Read<PBegin, PubsubMessage>
ParDo<PubsubMessage, PubsubMessage>  # stats
--- python --
ParDo<PubsubMessage, ...>  # custom logger
{noformat}

That's appealing to me, though it's worth noting that we would get this benefit 
using any cross-language coder, whether it's protobufs, avro, or just rewriting 
PubsubMessageWithAttributesCoder in python.



was (Author: chadrik):
To follow up from the ML thread, I'm having a very similar problem when 
converting PubsubIO to an external transform.

{quote}
To transfer data between the two a coder needs to be involved. It happens to be 
that the initial read does not immediately drop the KafakRecord structure which 
does not work together well with our current assumption of only supporting 
"standard coders" present in all SDKs. Only the subsequent DoFn converts the 
KafkaRecord structure into a raw KV[byte, byte], but the DoFn won't have the 
coder available in its environment.
{quote}

PubsubIO behaves much the same way as you've described -- a Read produces a 
PubsubMessage structure which is converted to bytes (in protobufs format) by a 
subsequent DoFn -- but the problem I'm seeing is not that "the DoFn won't have 
the coder available in its environment", it's that the Read does not use the 
proper coder because FlinkStreamingPortablePipelineTranslator.translateRead 
replaces all non-standard coders with ByteArrayCoder.  So in my tests, the job 
fails with the first read message, complaining that PubsubMessage cannot be 
cast to bytes.

Some thoughts on the proposed solutions:

{quote}
1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
the Flink Runner
{quote}

I don't see how this will help PubusbIO, because as I stated I'm fairly 
confident the error happens when flink tries to store the read pubsub message, 
before the DoFn is even called.  

{quote}
2. Modify KafkaIO to immediately drop the KafkaRecord structure
{quote}

This is the simplest solution if we look at it superficially, but it's severely 
limited in usefulness.  In PubsubIO, the next transform is actually a DoFn that 
wants a PubsubMessage, to update some stats metrics (see my overview of the 
pipeline below).  This solution would either prevent that from working or 
require that every transform decode and encode the payload itself, which I 
think is self-defeating.

{quote}
3. Add the KafkaRecordCoder to all SDKs
{quote}

Thanks for the patch example.  I'll see if something similar works for PubsubIO.

{quote}
4. Add a generic coder, e.g. AvroCoder to all SDKs
{quote}

What about protobufs as a compromise between options 3 and 4?  

In the case of PubsubIO and PubsubMessages, java has its own native coder 
(actually a pair, one for messages w/ attributes and one for w/out), while 
python is using protobufs.  It would be simpler to write a protobufs coder for 
Java than to write a python version of PubsubMessageWithAttributesCoder, so I 
would favor protobufs as a pattern to apply generally to this kind of 
portability problem.

The other thing I like about a protobufs-based approach is that you could write 
an abstract ProtoCoder base class which can easily be specialized by simply 
providing the native class and the proto class.

My current external pubsub test pipeline looks like this after its been 
expanded:

{noformat}
--- java ---
Read<PBegin, PubsubMessage>
ParDo<PubsubMessage, PubsubMessage>  # stats
ParDo<PubsubMessage, byte[]>  # convert to pusbub protobuf byte array
--- python --
ParDo<byte[], PubsubMessage>  # convert from pusbub protobuf byte array
ParDo<PubsubMessage, ...>  # custom logger
{noformat}

If we created and registered a protobufs-based coder for PubsubMessage, we 
could cut out the converter transforms, and it would look like this:

{noformat}
--- java ---
Read<PBegin, PubsubMessage>
ParDo<PubsubMessage, PubsubMessage>  # stats
--- python --
ParDo<PubsubMessage, ...>  # custom logger
{noformat}

That's appealing to me, though it's worth noting that we would get this benefit 
using any cross-language coder, whether it's protobufs, avro, or just rewriting 
PubsubMessageWithAttributesCoder in python.


> Externally configured KafkaIO consumer causes coder problems
> ------------------------------------------------------------
>
>                 Key: BEAM-7870
>                 URL: https://issues.apache.org/jira/browse/BEAM-7870
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink, sdk-java-core
>            Reporter: Maximilian Michels
>            Assignee: Maximilian Michels
>            Priority: Major
>
> There are limitations for the consumer to work correctly. The biggest issue 
> is the structure of KafkaIO itself, which uses a combination of the source 
> interface and DoFns to generate the desired output. The problem is that the 
> source interface is natively translated by the Flink Runner to support 
> unbounded sources in portability, while the DoFn runs in a Java environment.
> To transfer data between the two a coder needs to be involved. It happens to 
> be that the initial read does not immediately drop the KafakRecord structure 
> which does not work together well with our current assumption of only 
> supporting "standard coders" present in all SDKs. Only the subsequent DoFn 
> converts the KafkaRecord structure into a raw KV[byte, byte], but the DoFn 
> won't have the coder available in its environment.
> There are several possible solutions:
>  1. Make the DoFn which drops the KafkaRecordCoder a native Java transform in 
> the Flink Runner
>  2. Modify KafkaIO to immediately drop the KafkaRecord structure
>  3. Add the KafkaRecordCoder to all SDKs
>  4. Add a generic coder, e.g. AvroCoder to all SDKs
> For a workaround which uses (3), please see this patch which is not a proper 
> fix but adds KafkaRecordCoder to the SDK such that it can be used 
> encode/decode records: 
> [https://github.com/mxm/beam/commit/b31cf99c75b3972018180d8ccc7e73d311f4cfed]
>  
> See also 
> [https://github.com/apache/beam/pull/8251|https://github.com/apache/beam/pull/8251:]



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to