To clarify my previous point, I think transform KafkaIO.Read.TypedWithoutMetadata [1] which produces a KV<K, V> (for example KV<bytes, bytes> if we use ByteArraySerializer for keys and values) should work in the current form if we don't have a runner specific override for the source (hence allowing source and the subsequent DoFn to fuse).
I think issues Chad is running into is due to Flink runner having a transform override for the source part preventing fusion between the source and the subsequent DoFn hence having to serialize data across the Fn API boundary. A SDF-based Kafka source that can work on Fn API without runner overrides will not run into this issue. And we already have a Python wrapper [2] for KafkaIO.Read.TypedWithoutMetadata transform which should work out of the box when we have the automatic SDF converter [3] as long as the runner supports SDF. We might still want to fix the coder issue so that KafkaIO.Read transform can be directly used as a cross-language transform instead of KafkaIO.Read.TypedWithoutMetadata. Thanks, Cham [1] https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L1105 [2] https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/external/kafka.py#L60 [3] https://github.com/apache/beam/pull/10897 On Wed, Feb 19, 2020 at 10:09 PM Robert Bradshaw <[email protected]> wrote: > Ah, yes, registering a RowCoder seems like a fine solution here. > (Either that or have a wrapping PTransform that explicitly converts to > Rows or similar.) > > On Wed, Feb 19, 2020 at 10:03 PM Chad Dombrova <[email protected]> wrote: > > > > The Java deps are only half of the problem. The other half is that > PubsubIO and KafkaIO are using classes that do not have a python equivalent > and thus no universal coder. The solution discussed in the issue I linked > above was to use row coder registries in Java, to convert from these types > to rows / schemas. > > > > Any thoughts on that? > > > > -chad > > > > > > On Wed, Feb 19, 2020 at 6:00 PM Robert Bradshaw <[email protected]> > wrote: > >> > >> Hopefully this should be resovled by > >> https://issues.apache.org/jira/browse/BEAM-9229 > >> > >> On Wed, Feb 19, 2020 at 5:52 PM Chad Dombrova <[email protected]> > wrote: > >> > > >> > We are using external transforms to get access to PubSubIO within > python. It works well, but there is one major issue remaining to fix: we > have to build a custom beam with a hack to add the PubSubIO java deps and > fix up the coders. This affects KafkaIO as well. There's an issue here: > https://issues.apache.org/jira/browse/BEAM-7870 > >> > > >> > I consider this to be the most pressing problem with external > transforms right now. > >> > > >> > -chad > >> > > >> > > >> > > >> > On Wed, Feb 12, 2020 at 9:28 AM Chamikara Jayalath < > [email protected]> wrote: > >> >> > >> >> > >> >> > >> >> On Wed, Feb 12, 2020 at 8:10 AM Alexey Romanenko < > [email protected]> wrote: > >> >>> > >> >>> > >> >>>> AFAIK, there's no official guide for cross-language pipelines. But > there are examples and test cases you can use as reference such as: > >> >>>> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount_xlang.py > >> >>>> > https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOExternalTest.java > >> >>>> > https://github.com/apache/beam/blob/master/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/ValidateRunnerXlangTest.java > >> >>>> > https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/portability/expansion_service_test.py > >> >>> > >> >>> > >> >>> I'm trying to work with tech writers to add more documentation > related to cross-language (in a few months). But any help related to > documenting what we have now is greatly appreciated. > >> >>> > >> >>> > >> >>> That would be great since now the information is a bit scattered > over different places. I’d be happy to help with any examples and their > testing that I hope I’ll have after a while. > >> >> > >> >> > >> >> Great. > >> >> > >> >>> > >> >>> > >> >>>> The runner and SDK supports are in working state I could say but > not many IOs expose their cross-language interface yet (you can easily > write cross-language configuration for any Python transforms by yourself > though). > >> >>> > >> >>> > >> >>> Should mention here the test suites for portable Flink and Spark > Heejong added recently :) > >> >>> > >> >>> > https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PostCommit_XVR_Flink/ > >> >>> > https://builds.apache.org/view/A-D/view/Beam/view/PostCommit/job/beam_PostCommit_XVR_Spark/ > >> >>> > >> >>> > >> >>> Nice! Looks like my question above about cross-language support in > Spark runner was redundant. > >> >>> > >> >>> > >> >>>> > >> >>>> > >> >>>>> > >> >>>>> - Is the information here > https://beam.apache.org/roadmap/connectors-multi-sdk/ up-to-date? Are > there any other entry points you can recommend? > >> >>>> > >> >>>> > >> >>>> I think it's up-to-date. > >> >>> > >> >>> > >> >>> Mostly up to date. Testing status is more complete now and we are > actively working on getting the dependences story correct and adding > support for DataflowRunner. > >> >>> > >> >>> > >> >>> Are there any “umbrella" Jiras regarding cross-language support > that I can track? > >> >> > >> >> > >> >> I don't think we have an umbrella JIRA currently. I can create one > and mention it in the roadmap. >
