On Fri, Apr 27, 2018 at 12:18 PM Thomas Weise <t...@apache.org> wrote:
> > The ability to specify with URN and implement custom transforms is also > important. Such transforms may not qualify for inclusion in Beam for a > variety of reasons (only relevant for a specific environment or use case, > dependencies/licensing, ...). > They don't need to be included in Beam - by design, a third party library transform can specify its own URN and Payload to be put in the proto representation. I'm not sure of the state of the code here, but I think the current path is a shared dep on runners-core-construction and some ServiceLoader shenanigans. Shading may be in place that breaks this. Kenn > For my specific experiment, I prefer the custom URN over trying to bend > the implementation to mimic an SDF based KafkaIO that it wouldn't (and > doesn't need to) be semantically equivalent to. At this point Beam > doesn't have the spec and implementation for said KafkaIO, but it would be > great to see an example how it would look like. Following a Beam spec > would absolutely make sense if the custom implementation is purely for > optimization or similar purpose. > > I wanted to circle back to the coder related question. I see that we now > have a proto definition for the standard transforms and coders, which is > really nice: > > > https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521 > > This enables interoperability between languages with some standard types > (KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder > would be required, implemented in both Java and Python. Any thoughts on > providing a generic tuple/record coder as part of the spec? > > Thanks, > Thomas > > > > On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote: > >> >> >> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> I agree with Thomas' sentiment that cross-language IO is very important >>>> because of how much work it takes to produce a mature connector >>>> implementation in a language. Looking at implementations of BigQueryIO, >>>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted >>>> to reimplement them entirely in Python and Go. >>>> >>>> I'm imagining pretty much what Kenn is describing: a pipeline would >>>> specify some transforms by URN + payload, and rely on the runner to do >>>> whatever it takes to run this - either by expanding it into a Beam >>>> implementation of this transform that the runner chooses to use (could be >>>> in the same language or in a different language; either way, the runner >>>> would indeed need to invoke the respective SDK to expand it given the >>>> parameters), or by doing something entirely runner-specific (e.g. using the >>>> built-in Flink Kafka connector). >>>> >>>> I don't see a reason to require that there *must* exist a Beam >>>> implementation of this transform. There only, ideally, must be a runner- >>>> and language-agnostic spec for the URN and payload; of course, then the >>>> transform is only as portable as the set of runners that implement this >>>> URN. >>>> >>> >>> For a transform in general it's true that we don't need a Beam >>> implementation, but more specifically for IOs I think there are many >>> benefits to having the implementation in Beam. For example, >>> >>> - IO connector will offer same behavior and feature set across >>> various runners/SDKs. >>> - Beam community will be able to view/modify/improve the IO >>> connector. >>> - existing IO connectors will serve as examples for users who wish >>> to develop new IO connectors >>> >>> >>> >> - More runners will be able to execute the users pipeline. >> >> >>>> I actually really like the idea that the transform can be implemented >>>> in a completely runner-specific way without a Beam expansion to back it up >>>> - it would let us unblock a lot of the work earlier than full-blown >>>> cross-language IO is delivered or even than SDFs work in all >>>> languages/runners. >>>> >>> >>> If there are existing established connectors (for example, Kafka for >>> Flink in this case) I agree. But for anybody developing a new IO connector, >>> I think we should encourage developing that in Beam (in some SDK) given >>> that the connector will be available to all runners (and to all SDKs once >>> we have cross-language transforms). >>> >>> Thanks, >>> Cham >>> >>> >>>> >>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <k...@google.com> >>>> wrote: >>>> >>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka >>>>> connector, right? I was imagining: Python SDK submits pipeline with a >>>>> KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a >>>>> small Flink subgraph, including the native Flink Kafka connector and some >>>>> compensating transfoms to match the required semantics. To me, this is >>>>> preferable to making single-runner transform URNs, since that breaks >>>>> runner >>>>> portability by definition. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath < >>>>> chamik...@google.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> The premise of URN + payload is that you can establish a spec. A >>>>>>>> native override still needs to meet the spec - it may still require >>>>>>>> some >>>>>>>> compensating code. Worrying about weird differences between runners >>>>>>>> seems >>>>>>>> more about worrying that an adequate spec cannot be determined. >>>>>>>> >>>>>>> >>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a >>>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the >>>>>>> SDF >>>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of >>>>>>> topics. If the native Flink source doesn't work this way, then it >>>>>>> doesn't >>>>>>> share the same spec and should have a different URN. >>>>>>> >>>>>> >>>>>> Agree that if they cannot share the same spec, SDF and native >>>>>> transforms warrant different URNs. Native Kafka might be able to support >>>>>> a >>>>>> PCollection of topics/partitions as an input though by utilizing >>>>>> underlying >>>>>> native Flink Kafka connector as a library. On the other hand, we might >>>>>> decide to expand SDF based ParDos into to other transforms before a >>>>>> runner >>>>>> gets a chance to override in which case this kind of replacements will >>>>>> not >>>>>> be possible. >>>>>> >>>>>> Thanks, >>>>>> Cham >>>>>> >>>>>> >>>>>>> >>>>>>>> Runners will already invoke the SDF differently, so users treating >>>>>>>> every detail of some implementation as the spec are doomed. >>>>>>>> >>>>>>>> Kenn >>>>>>>> >>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath < >>>>>>>>> chamik...@google.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could >>>>>>>>>>> be a DoFn that >>>>>>>>>>> > fails loudly if it's actually called in the short term rather >>>>>>>>>>> than a full >>>>>>>>>>> > Python implementation. >>>>>>>>>>> >>>>>>>>>>> For configurable runner-native IO, for now, I think it is >>>>>>>>>>> reasonable to use a URN + special data payload directly without a >>>>>>>>>>> KafkaDoFn >>>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for >>>>>>>>>>> PubSub-on-Dataflow and something similar would work for >>>>>>>>>>> Kafka-on-Flink as >>>>>>>>>>> well. I agree that non-native alternative implementation is >>>>>>>>>>> desirable, but >>>>>>>>>>> if one is not present we should IMO rather fail at job submission >>>>>>>>>>> instead >>>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution >>>>>>>>>>> engine >>>>>>>>>>> where non-native implementations are not possible. >>>>>>>>>>> >>>>>>>>>> >>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand >>>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a >>>>>>>>>> map-task >>>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be >>>>>>>>>> free to >>>>>>>>>> override it with an runner-native implementation if desired. I assume >>>>>>>>>> runner will have a chance to perform this override before the SDF >>>>>>>>>> expansion >>>>>>>>>> (which is not fully designed yet). Providing a separate source/sink >>>>>>>>>> transforms for Flink native Kafka will be an option as well, but >>>>>>>>>> that will >>>>>>>>>> be less desirable from a Python user API perspective. >>>>>>>>>> >>>>>>>>> >>>>>>>>> Are we sure that the internal SDF will provide the same >>>>>>>>> functionality as the native one? What if the Kafka SDF is in the >>>>>>>>> middle of >>>>>>>>> a pipeline - can Flink support that? Having a separate transform for >>>>>>>>> the >>>>>>>>> Flink native source might be a better user experience than having one >>>>>>>>> that >>>>>>>>> changes its behavior in strange ways depending on the runner. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw < >>>>>>>>>>> rober...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> > Hi Cham, >>>>>>>>>>>> >>>>>>>>>>>> > Thanks for the feedback! >>>>>>>>>>>> >>>>>>>>>>>> > I should have probably clarified that my POC and questions >>>>>>>>>>>> aren't >>>>>>>>>>>> specific to Kafka as source, but pretty much any other >>>>>>>>>>>> source/sink that we >>>>>>>>>>>> internally use as well. We have existing Flink pipelines that >>>>>>>>>>>> are written >>>>>>>>>>>> in Java and we want to use the same connectors with the Python >>>>>>>>>>>> SDK on top >>>>>>>>>>>> of the already operationalized Flink stack. Therefore, >>>>>>>>>>>> portability isn't a >>>>>>>>>>>> concern as much as the ability to integrate is. >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Thanks for the clarification. Agree that providing runner-native >>>>>>>>>> implementations of established source/sinks can be can be desirable >>>>>>>>>> in some >>>>>>>>>> cases. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> > --> >>>>>>>>>>>> >>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>>>>>>>> > <chamik...@google.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> >> Hi Thomas, >>>>>>>>>>>> >>>>>>>>>>>> >> Seems like we are working on similar (partially) things :). >>>>>>>>>>>> >>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom >>>>>>>>>>>> streaming source >>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable >>>>>>>>>>>> Flink runner. >>>>>>>>>>>> >>>>>>>>>>>> >>> We eventually want to use the same native Flink connectors >>>>>>>>>>>> for sources >>>>>>>>>>>> and sinks that we also use in other Flink jobs. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do >>>>>>>>>>>> you mean >>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use >>>>>>>>>>>> the same >>>>>>>>>>>> Kafka connector implementation ? >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> > The native Flink sources as shown in the example below, not >>>>>>>>>>>> the Beam >>>>>>>>>>>> KafkaIO or other Beam sources. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>> I got a simple example to work with the >>>>>>>>>>>> FlinkKafkaConsumer010 reading >>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is >>>>>>>>>>>> here: >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>>>>>>>> particular: >>>>>>>>>>>> >>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner >>>>>>>>>>>> (custom >>>>>>>>>>>> translator could be loaded with ServiceLoader, additional >>>>>>>>>>>> translations >>>>>>>>>>>> could also be specified as job server configuration, pipeline >>>>>>>>>>>> option, ...) >>>>>>>>>>>> >>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>>>>>>>> recommended >>>>>>>>>>>> way to define a custom transform (it would eventually live in a >>>>>>>>>>>> reusable >>>>>>>>>>>> custom module that pipeline authors can import)? Also, the >>>>>>>>>>>> example does not >>>>>>>>>>>> have the configuration part covered yet.. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK >>>>>>>>>>>> is the >>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm >>>>>>>>>>>> trying to add a >>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>>>>>>>> comparing >>>>>>>>>>>> different Kafka Python client libraries. Will share more >>>>>>>>>>>> information on >>>>>>>>>>>> this soon. >>>>>>>>>>>> >>>>>>>>>>>> >> I understand this might not be possible in all cases and we >>>>>>>>>>>> might want >>>>>>>>>>>> to consider adding a native source/sink implementations. But >>>>>>>>>>>> this will >>>>>>>>>>>> result in the implementation being runner-specific (each runner >>>>>>>>>>>> will have >>>>>>>>>>>> to have it's own source/sink implementation). So I think we >>>>>>>>>>>> should try to >>>>>>>>>>>> add connector implementations to Beam using the standard API >>>>>>>>>>>> whenever >>>>>>>>>>>> possible. We also have plans to implement support for cross SDK >>>>>>>>>>>> transforms >>>>>>>>>>>> in the future (so that we can utilize Java implementation from >>>>>>>>>>>> Python for >>>>>>>>>>>> example) but we are not there yet and we might still want to >>>>>>>>>>>> implement a >>>>>>>>>>>> connector for a given SDK if there's good client library >>>>>>>>>>>> support. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> > It is great that the Python SDK will have connectors that are >>>>>>>>>>>> written in >>>>>>>>>>>> Python in the future, but I think it is equally if not more >>>>>>>>>>>> important to be >>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK >>>>>>>>>>>> (and any >>>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment >>>>>>>>>>>> it should be >>>>>>>>>>>> possible to offer this to users in a way that is largely >>>>>>>>>>>> transparent. It >>>>>>>>>>>> takes significant time and effort to mature connectors and I'm >>>>>>>>>>>> not sure it >>>>>>>>>>>> is realistic to repeat that for all external systems in >>>>>>>>>>>> multiple languages. >>>>>>>>>>>> Or, to put it in another way, it is likely that instead of one >>>>>>>>>>>> over time >>>>>>>>>>>> rock solid connector per external system there will be multiple >>>>>>>>>>>> less mature >>>>>>>>>>>> implementations. That's also the reason we internally want to >>>>>>>>>>>> use the Flink >>>>>>>>>>>> native connectors - we know what they can and cannot do and >>>>>>>>>>>> want to >>>>>>>>>>>> leverage the existing investment. >>>>>>>>>>>> >>>>>>>>>>>> There are two related issues here: how to specify transforms >>>>>>>>>>>> (such as >>>>>>>>>>>> sources) in a language-independent manner, and how specific >>>>>>>>>>>> runners can >>>>>>>>>>>> recognize and run them, but URNs solve both. For this we use >>>>>>>>>>>> URNs: the >>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload >>>>>>>>>>>> that fully >>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN >>>>>>>>>>>> and >>>>>>>>>>>> payload.) A runner that understands these URNs is free to make >>>>>>>>>>>> any >>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this >>>>>>>>>>>> transform. >>>>>>>>>>>> >>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be >>>>>>>>>>>> a DoFn that >>>>>>>>>>>> fails loudly if it's actually called in the short term rather >>>>>>>>>>>> than a full >>>>>>>>>>>> Python implementation. Eventually, we would like to be able to >>>>>>>>>>>> call out to >>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated >>>>>>>>>>>> ones like >>>>>>>>>>>> BigQueryIO). >>>>>>>>>>>> >>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source >>>>>>>>>>>> only >>>>>>>>>>>> considers the message value and uses the byte coder that both >>>>>>>>>>>> sides >>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other >>>>>>>>>>>> metadata to >>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java >>>>>>>>>>>> KafkaIO), then a >>>>>>>>>>>> specific coder is needed. Such coder could be written using >>>>>>>>>>>> protobuf, Avro >>>>>>>>>>>> etc, but it would also need to be registered. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in >>>>>>>>>>>> Python SDK. >>>>>>>>>>>> >>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or >>>>>>>>>>>> rather any >>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>>>>>>>> agnostic >>>>>>>>>>>> encoding for KV<?,?>, for example? >>>>>>>>>>>> >>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or >>>>>>>>>>>> payload), and >>>>>>>>>>>> there are a couple of standard ones, including KV. See >>>>>>>>>>>> >>>>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>>>>>>>> This is not a closed set. >>>>>>>>>>>> >>>>>>>>>>>> - Robert >>>>>>>>>>>> >>>>>>>>>>> >