To clarify my previous point, I think transform
KafkaIO.Read.TypedWithoutMetadata [1] which produces a KV<K, V> (for
example KV<bytes, bytes> if we use ByteArraySerializer for keys and values)
should work in the current form if we don't have a runner specific override
for the source (hence allowing source and the subsequent DoFn to fuse).

I think issues Chad is running into is due to Flink runner having a
transform override for the source part preventing fusion between the source
and the subsequent DoFn hence having to serialize data across the Fn API
boundary. A SDF-based Kafka source that can work on Fn API without runner
overrides will not run into this issue.

And we already have a Python wrapper [2] for
KafkaIO.Read.TypedWithoutMetadata transform which should work out of the
box when we have the automatic SDF converter [3] as long as the runner
supports SDF.

We might still want to fix the coder issue so that KafkaIO.Read transform
can be directly used as a cross-language transform instead
of KafkaIO.Read.TypedWithoutMetadata.

Thanks,
Cham

[1]
https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1105
[2]
https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L60
[3] https://github.com/apache/beam/pull/10897

On Wed, Feb 19, 2020 at 10:09 PM Robert Bradshaw <[email protected]>
wrote:

> Ah, yes, registering a RowCoder seems like a fine solution here.
> (Either that or have a wrapping PTransform that explicitly converts to
> Rows or similar.)
>
> On Wed, Feb 19, 2020 at 10:03 PM Chad Dombrova <[email protected]> wrote:
> >
> > The Java deps are only half of the problem. The other half is that
> PubsubIO and KafkaIO are using classes that do not have a python equivalent
> and thus no universal coder.  The solution discussed in the issue I linked
> above was to use row coder registries in Java, to convert from these types
> to rows / schemas.
> >
> > Any thoughts on that?
> >
> > -chad
> >
> >
> > On Wed, Feb 19, 2020 at 6:00 PM Robert Bradshaw <[email protected]>
> wrote:
> >>
> >> Hopefully this should be resovled by
> >> https://issues.apache.org/jira/browse/BEAM-9229
> >>
> >> On Wed, Feb 19, 2020 at 5:52 PM Chad Dombrova <[email protected]>
> wrote:
> >> >
> >> > We are using external transforms to get access to PubSubIO within
> python.  It works well, but there is one major issue remaining to fix:  we
> have to build a custom beam with a hack to add the PubSubIO java deps and
> fix up the coders.  This affects KafkaIO as well.  There's an issue here:
> https://issues.apache.org/jira/browse/BEAM-7870
> >> >
> >> > I consider this to be the most pressing problem with external
> transforms right now.
> >> >
> >> > -chad
> >> >
> >> >
> >> >
> >> > On Wed, Feb 12, 2020 at 9:28 AM Chamikara Jayalath <
> [email protected]> wrote:
> >> >>
> >> >>
> >> >>
> >> >> On Wed, Feb 12, 2020 at 8:10 AM Alexey Romanenko <
> [email protected]> wrote:
> >> >>>
> >> >>>
> >> >>>> AFAIK, there's no official guide for cross-language pipelines. But
> there are examples and test cases you can use as reference such as:
> >> >>>>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py
> >> >>>>
> https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java
> >> >>>>
> https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java
> >> >>>>
> https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service_test.py
> >> >>>
> >> >>>
> >> >>> I'm trying to work with tech writers to add more documentation
> related to cross-language (in a few months). But any help related to
> documenting what we have now is greatly appreciated.
> >> >>>
> >> >>>
> >> >>> That would be great since now the information is a bit scattered
> over different places. I’d be happy to help with any examples and their
> testing that I hope I’ll have after a while.
> >> >>
> >> >>
> >> >> Great.
> >> >>
> >> >>>
> >> >>>
> >> >>>> The runner and SDK supports are in working state I could say but
> not many IOs expose their cross-language interface yet (you can easily
> write cross-language configuration for any Python transforms by yourself
> though).
> >> >>>
> >> >>>
> >> >>> Should mention here the test suites for portable Flink and Spark
> Heejong added recently :)
> >> >>>
> >> >>>
> https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PostCommit_XVR_Flink/
> >> >>>
> https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PostCommit_XVR_Spark/
> >> >>>
> >> >>>
> >> >>> Nice! Looks like my question above about cross-language support in
> Spark runner was redundant.
> >> >>>
> >> >>>
> >> >>>>
> >> >>>>
> >> >>>>>
> >> >>>>> - Is the information here
> https://beam.apache.org/roadmap/connectors-multi-sdk/ up-to-date? Are
> there any other entry points you can recommend?
> >> >>>>
> >> >>>>
> >> >>>> I think it's up-to-date.
> >> >>>
> >> >>>
> >> >>> Mostly up to date.  Testing status is more complete now and we are
> actively working on getting the dependences story correct and adding
> support for DataflowRunner.
> >> >>>
> >> >>>
> >> >>> Are there any “umbrella" Jiras regarding cross-language support
> that I can track?
> >> >>
> >> >>
> >> >> I don't think we have an umbrella JIRA currently. I can create one
> and mention it in the roadmap.
>

Reply via email to