Hi Chad! Thank you so much for your feedback. You are 100% on the right track. What you are seeing is a core issue that also needs to be solved for KafkaIO to be fully usable in other SDKs. I haven't had much time to work on this in the past weeks but now is the time :)
The cross-language implementation recycles source connectors (like KafkaIO.Read) which use the legacy source interface. This interface does not exist anymore in portability. The current portability architecture assumes UDFs (this includes KafkaIO.Read) to run in an environment, and not directly on the Runner. This causes issues when there are multiple transforms associated with a connector which run with and without an environment, e.g. KafkaIO.Read or PubSubIO.Read. The issue is also tracked here: https://jira.apache.org/jira/browse/BEAM-7870 There are some suggestions in the issue. I think the best solution is to allow execution of the source API parts of KafkaIO/PubSubIO (on the Runner) and the following UDFs (in the environment). Since those do not cross the language boundary, it is simply a matter of settings this up correctly. -Max On 20.08.19 03:45, Chad Dombrova wrote: > > I don't understand why this replacement is necessary, since > the next transform in the chain is a java ParDo that seems > like it should be fully capable of using > PubsubMessageWithAttributesCoder. > > > Not too familiar with Flink, but have you tried using PubSub source > from a pure Java Flink pipeline ? I expect this to work but haven't > tested it. If that works agree that the coder replacement sounds > strange. > > > Just spoke with my co-worker and he confirmed that Pubsub Read works in > pure Java on Flink. We're going to test sending the same Java pipeline > through the portable runner next. Our hypothesis is that it will /not/ > work. If it /does/, then that means it's something specific to how the > transforms are created when using the expansion service. > > -chad > >
