On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <kirpic...@google.com> wrote:
> I agree with Thomas' sentiment that cross-language IO is very important > because of how much work it takes to produce a mature connector > implementation in a language. Looking at implementations of BigQueryIO, > PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted > to reimplement them entirely in Python and Go. > > I'm imagining pretty much what Kenn is describing: a pipeline would > specify some transforms by URN + payload, and rely on the runner to do > whatever it takes to run this - either by expanding it into a Beam > implementation of this transform that the runner chooses to use (could be > in the same language or in a different language; either way, the runner > would indeed need to invoke the respective SDK to expand it given the > parameters), or by doing something entirely runner-specific (e.g. using the > built-in Flink Kafka connector). > > I don't see a reason to require that there *must* exist a Beam > implementation of this transform. There only, ideally, must be a runner- > and language-agnostic spec for the URN and payload; of course, then the > transform is only as portable as the set of runners that implement this URN. > For a transform in general it's true that we don't need a Beam implementation, but more specifically for IOs I think there are many benefits to having the implementation in Beam. For example, - IO connector will offer same behavior and feature set across various runners/SDKs. - Beam community will be able to view/modify/improve the IO connector. - existing IO connectors will serve as examples for users who wish to develop new IO connectors > I actually really like the idea that the transform can be implemented in a > completely runner-specific way without a Beam expansion to back it up - it > would let us unblock a lot of the work earlier than full-blown > cross-language IO is delivered or even than SDFs work in all > languages/runners. > If there are existing established connectors (for example, Kafka for Flink in this case) I agree. But for anybody developing a new IO connector, I think we should encourage developing that in Beam (in some SDK) given that the connector will be available to all runners (and to all SDKs once we have cross-language transforms). Thanks, Cham > > On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <k...@google.com> wrote: > >> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector, >> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with >> URN + payload) maybe bogus contents. It is replaced with a small Flink >> subgraph, including the native Flink Kafka connector and some compensating >> transfoms to match the required semantics. To me, this is preferable to >> making single-runner transform URNs, since that breaks runner portability >> by definition. >> >> Kenn >> >> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote: >>> >>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com> wrote: >>>> >>>>> The premise of URN + payload is that you can establish a spec. A >>>>> native override still needs to meet the spec - it may still require some >>>>> compensating code. Worrying about weird differences between runners seems >>>>> more about worrying that an adequate spec cannot be determined. >>>>> >>>> >>>> My point exactly. a SDF-based KafkaIO can run in the middle of a >>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF >>>> KafkaIO run after that on this dynamic (not known until runtime) list of >>>> topics. If the native Flink source doesn't work this way, then it doesn't >>>> share the same spec and should have a different URN. >>>> >>> >>> Agree that if they cannot share the same spec, SDF and native transforms >>> warrant different URNs. Native Kafka might be able to support a PCollection >>> of topics/partitions as an input though by utilizing underlying native >>> Flink Kafka connector as a library. On the other hand, we might decide to >>> expand SDF based ParDos into to other transforms before a runner gets a >>> chance to override in which case this kind of replacements will not be >>> possible. >>> >>> Thanks, >>> Cham >>> >>> >>>> >>>>> Runners will already invoke the SDF differently, so users treating >>>>> every detail of some implementation as the spec are doomed. >>>>> >>>>> Kenn >>>>> >>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> >>>>>> >>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath < >>>>>> chamik...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>>> DoFn that >>>>>>>> > fails loudly if it's actually called in the short term rather >>>>>>>> than a full >>>>>>>> > Python implementation. >>>>>>>> >>>>>>>> For configurable runner-native IO, for now, I think it is >>>>>>>> reasonable to use a URN + special data payload directly without a >>>>>>>> KafkaDoFn >>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for >>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink >>>>>>>> as >>>>>>>> well. I agree that non-native alternative implementation is desirable, >>>>>>>> but >>>>>>>> if one is not present we should IMO rather fail at job submission >>>>>>>> instead >>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution >>>>>>>> engine >>>>>>>> where non-native implementations are not possible. >>>>>>>> >>>>>>> >>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand >>>>>>> similar to any other SDF by default (initial splitting, GBK, and a >>>>>>> map-task >>>>>>> equivalent, for example) but a runner (Flink in this case) will be free >>>>>>> to >>>>>>> override it with an runner-native implementation if desired. I assume >>>>>>> runner will have a chance to perform this override before the SDF >>>>>>> expansion >>>>>>> (which is not fully designed yet). Providing a separate source/sink >>>>>>> transforms for Flink native Kafka will be an option as well, but that >>>>>>> will >>>>>>> be less desirable from a Python user API perspective. >>>>>>> >>>>>> >>>>>> Are we sure that the internal SDF will provide the same functionality >>>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline - >>>>>> can Flink support that? Having a separate transform for the Flink native >>>>>> source might be a better user experience than having one that changes its >>>>>> behavior in strange ways depending on the runner. >>>>>> >>>>>> >>>>>> >>>>>>> >>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw < >>>>>>>> rober...@google.com> wrote: >>>>>>>> >>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> > Hi Cham, >>>>>>>>> >>>>>>>>> > Thanks for the feedback! >>>>>>>>> >>>>>>>>> > I should have probably clarified that my POC and questions aren't >>>>>>>>> specific to Kafka as source, but pretty much any other source/sink >>>>>>>>> that we >>>>>>>>> internally use as well. We have existing Flink pipelines that are >>>>>>>>> written >>>>>>>>> in Java and we want to use the same connectors with the Python SDK >>>>>>>>> on top >>>>>>>>> of the already operationalized Flink stack. Therefore, portability >>>>>>>>> isn't a >>>>>>>>> concern as much as the ability to integrate is. >>>>>>>>> >>>>>>>> >>>>>>> Thanks for the clarification. Agree that providing runner-native >>>>>>> implementations of established source/sinks can be can be desirable in >>>>>>> some >>>>>>> cases. >>>>>>> >>>>>>> >>>>>>>>> > --> >>>>>>>>> >>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>>>>> > <chamik...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >> Hi Thomas, >>>>>>>>> >>>>>>>>> >> Seems like we are working on similar (partially) things :). >>>>>>>>> >>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming >>>>>>>>> source >>>>>>>>> for a Python pipeline executing on the (in-progress) portable >>>>>>>>> Flink runner. >>>>>>>>> >>>>>>>>> >>> We eventually want to use the same native Flink connectors for >>>>>>>>> sources >>>>>>>>> and sinks that we also use in other Flink jobs. >>>>>>>>> >>>>>>>>> >>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do >>>>>>>>> you mean >>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the >>>>>>>>> same >>>>>>>>> Kafka connector implementation ? >>>>>>>>> >>>>>>>>> >>>>>>>>> > The native Flink sources as shown in the example below, not the >>>>>>>>> Beam >>>>>>>>> KafkaIO or other Beam sources. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 >>>>>>>>> reading >>>>>>>>> from Kafka and a Python lambda logging the value. The code is here: >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>>>>> particular: >>>>>>>>> >>>>>>>>> >>> * Enabling custom translation on the Flink portable runner >>>>>>>>> (custom >>>>>>>>> translator could be loaded with ServiceLoader, additional >>>>>>>>> translations >>>>>>>>> could also be specified as job server configuration, pipeline >>>>>>>>> option, ...) >>>>>>>>> >>>>>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>>>>> recommended >>>>>>>>> way to define a custom transform (it would eventually live in a >>>>>>>>> reusable >>>>>>>>> custom module that pipeline authors can import)? Also, the example >>>>>>>>> does not >>>>>>>>> have the configuration part covered yet.. >>>>>>>>> >>>>>>>>> >>>>>>>>> >> The only standard unbounded source API offered by Python SDK is >>>>>>>>> the >>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying >>>>>>>>> to add a >>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>>>>> comparing >>>>>>>>> different Kafka Python client libraries. Will share more >>>>>>>>> information on >>>>>>>>> this soon. >>>>>>>>> >>>>>>>>> >> I understand this might not be possible in all cases and we >>>>>>>>> might want >>>>>>>>> to consider adding a native source/sink implementations. But this >>>>>>>>> will >>>>>>>>> result in the implementation being runner-specific (each runner >>>>>>>>> will have >>>>>>>>> to have it's own source/sink implementation). So I think we should >>>>>>>>> try to >>>>>>>>> add connector implementations to Beam using the standard API >>>>>>>>> whenever >>>>>>>>> possible. We also have plans to implement support for cross SDK >>>>>>>>> transforms >>>>>>>>> in the future (so that we can utilize Java implementation from >>>>>>>>> Python for >>>>>>>>> example) but we are not there yet and we might still want to >>>>>>>>> implement a >>>>>>>>> connector for a given SDK if there's good client library support. >>>>>>>>> >>>>>>>>> >>>>>>>>> > It is great that the Python SDK will have connectors that are >>>>>>>>> written in >>>>>>>>> Python in the future, but I think it is equally if not more >>>>>>>>> important to be >>>>>>>>> able to use at least the Java Beam connectors with Python SDK (and >>>>>>>>> any >>>>>>>>> other non-Java SDK). Especially in a fully managed environment it >>>>>>>>> should be >>>>>>>>> possible to offer this to users in a way that is largely >>>>>>>>> transparent. It >>>>>>>>> takes significant time and effort to mature connectors and I'm not >>>>>>>>> sure it >>>>>>>>> is realistic to repeat that for all external systems in multiple >>>>>>>>> languages. >>>>>>>>> Or, to put it in another way, it is likely that instead of one >>>>>>>>> over time >>>>>>>>> rock solid connector per external system there will be multiple >>>>>>>>> less mature >>>>>>>>> implementations. That's also the reason we internally want to use >>>>>>>>> the Flink >>>>>>>>> native connectors - we know what they can and cannot do and want to >>>>>>>>> leverage the existing investment. >>>>>>>>> >>>>>>>>> There are two related issues here: how to specify transforms (such >>>>>>>>> as >>>>>>>>> sources) in a language-independent manner, and how specific >>>>>>>>> runners can >>>>>>>>> recognize and run them, but URNs solve both. For this we use >>>>>>>>> URNs: the >>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that >>>>>>>>> fully >>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and >>>>>>>>> payload.) A runner that understands these URNs is free to make any >>>>>>>>> (semantically-equivalent) substitutions it wants for this >>>>>>>>> transform. >>>>>>>>> >>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a >>>>>>>>> DoFn that >>>>>>>>> fails loudly if it's actually called in the short term rather than >>>>>>>>> a full >>>>>>>>> Python implementation. Eventually, we would like to be able to >>>>>>>>> call out to >>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones >>>>>>>>> like >>>>>>>>> BigQueryIO). >>>>>>>>> >>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only >>>>>>>>> considers the message value and uses the byte coder that both sides >>>>>>>>> understand. If I wanted to pass on the key and possibly other >>>>>>>>> metadata to >>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), >>>>>>>>> then a >>>>>>>>> specific coder is needed. Such coder could be written using >>>>>>>>> protobuf, Avro >>>>>>>>> etc, but it would also need to be registered. >>>>>>>>> >>>>>>>>> >>>>>>>>> >> I think this requirement goes away if we implement Kafka in >>>>>>>>> Python SDK. >>>>>>>>> >>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or >>>>>>>>> rather any >>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>>>>> agnostic >>>>>>>>> encoding for KV<?,?>, for example? >>>>>>>>> >>>>>>>>> Yes, Coders are also specified by URN (+components and/or >>>>>>>>> payload), and >>>>>>>>> there are a couple of standard ones, including KV. See >>>>>>>>> >>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>>>>> This is not a closed set. >>>>>>>>> >>>>>>>>> - Robert >>>>>>>>> >>>>>>>>