On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote: > On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com> wrote: > >> The premise of URN + payload is that you can establish a spec. A native >> override still needs to meet the spec - it may still require some >> compensating code. Worrying about weird differences between runners seems >> more about worrying that an adequate spec cannot be determined. >> > > My point exactly. a SDF-based KafkaIO can run in the middle of a pipeline. > E.g. we could have TextIO producing a list of topics, and the SDF KafkaIO > run after that on this dynamic (not known until runtime) list of topics. If > the native Flink source doesn't work this way, then it doesn't share the > same spec and should have a different URN. >
Agree that if they cannot share the same spec, SDF and native transforms warrant different URNs. Native Kafka might be able to support a PCollection of topics/partitions as an input though by utilizing underlying native Flink Kafka connector as a library. On the other hand, we might decide to expand SDF based ParDos into to other transforms before a runner gets a chance to override in which case this kind of replacements will not be possible. Thanks, Cham > >> Runners will already invoke the SDF differently, so users treating every >> detail of some implementation as the spec are doomed. >> >> Kenn >> >> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote: >> >>> >>> >>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <chamik...@google.com> >>> wrote: >>> >>>> >>>> >>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com> >>>> wrote: >>>> >>>>> > Note that a KafkaDoFn still needs to be provided, but could be a >>>>> DoFn that >>>>> > fails loudly if it's actually called in the short term rather than a >>>>> full >>>>> > Python implementation. >>>>> >>>>> For configurable runner-native IO, for now, I think it is reasonable >>>>> to use a URN + special data payload directly without a KafkaDoFn -- >>>>> assuming it's a portable pipeline. That's what we do in Go for >>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as >>>>> well. I agree that non-native alternative implementation is desirable, but >>>>> if one is not present we should IMO rather fail at job submission instead >>>>> of at runtime. I could imagine connectors intrinsic to an execution engine >>>>> where non-native implementations are not possible. >>>>> >>>> >>>> I think, in this case, KafkaDoFn can be a SDF that would expand similar >>>> to any other SDF by default (initial splitting, GBK, and a map-task >>>> equivalent, for example) but a runner (Flink in this case) will be free to >>>> override it with an runner-native implementation if desired. I assume >>>> runner will have a chance to perform this override before the SDF expansion >>>> (which is not fully designed yet). Providing a separate source/sink >>>> transforms for Flink native Kafka will be an option as well, but that will >>>> be less desirable from a Python user API perspective. >>>> >>> >>> Are we sure that the internal SDF will provide the same functionality as >>> the native one? What if the Kafka SDF is in the middle of a pipeline - can >>> Flink support that? Having a separate transform for the Flink native source >>> might be a better user experience than having one that changes its behavior >>> in strange ways depending on the runner. >>> >>> >>> >>>> >>>> >>>>> >>>>> >>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>> > Hi Cham, >>>>>> >>>>>> > Thanks for the feedback! >>>>>> >>>>>> > I should have probably clarified that my POC and questions aren't >>>>>> specific to Kafka as source, but pretty much any other source/sink >>>>>> that we >>>>>> internally use as well. We have existing Flink pipelines that are >>>>>> written >>>>>> in Java and we want to use the same connectors with the Python SDK on >>>>>> top >>>>>> of the already operationalized Flink stack. Therefore, portability >>>>>> isn't a >>>>>> concern as much as the ability to integrate is. >>>>>> >>>>> >>>> Thanks for the clarification. Agree that providing runner-native >>>> implementations of established source/sinks can be can be desirable in some >>>> cases. >>>> >>>> >>>>>> > --> >>>>>> >>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath >>>>>> > <chamik...@google.com> >>>>>> wrote: >>>>>> >>>>>> >> Hi Thomas, >>>>>> >>>>>> >> Seems like we are working on similar (partially) things :). >>>>>> >>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org> >>>>>> wrote: >>>>>> >>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming >>>>>> source >>>>>> for a Python pipeline executing on the (in-progress) portable Flink >>>>>> runner. >>>>>> >>>>>> >>> We eventually want to use the same native Flink connectors for >>>>>> sources >>>>>> and sinks that we also use in other Flink jobs. >>>>>> >>>>>> >>>>>> >> Could you clarify what you mean by same Flink connector ? Do you >>>>>> mean >>>>>> that Beam-based and non-Beam-based versions of Flink will use the same >>>>>> Kafka connector implementation ? >>>>>> >>>>>> >>>>>> > The native Flink sources as shown in the example below, not the Beam >>>>>> KafkaIO or other Beam sources. >>>>>> >>>>>> >>>>>> >>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010 >>>>>> reading >>>>>> from Kafka and a Python lambda logging the value. The code is here: >>>>>> >>>>>> >>>>>> >>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9 >>>>>> >>>>>> >>>>>> >>>>>> >>> I'm looking for feedback/opinions on the following items in >>>>>> particular: >>>>>> >>>>>> >>> * Enabling custom translation on the Flink portable runner (custom >>>>>> translator could be loaded with ServiceLoader, additional translations >>>>>> could also be specified as job server configuration, pipeline option, >>>>>> ...) >>>>>> >>>>>> >>> * For the Python side, is what's shown in the commit the >>>>>> recommended >>>>>> way to define a custom transform (it would eventually live in a >>>>>> reusable >>>>>> custom module that pipeline authors can import)? Also, the example >>>>>> does not >>>>>> have the configuration part covered yet.. >>>>>> >>>>>> >>>>>> >> The only standard unbounded source API offered by Python SDK is the >>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to >>>>>> add a >>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is >>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently >>>>>> comparing >>>>>> different Kafka Python client libraries. Will share more information >>>>>> on >>>>>> this soon. >>>>>> >>>>>> >> I understand this might not be possible in all cases and we might >>>>>> want >>>>>> to consider adding a native source/sink implementations. But this will >>>>>> result in the implementation being runner-specific (each runner will >>>>>> have >>>>>> to have it's own source/sink implementation). So I think we should >>>>>> try to >>>>>> add connector implementations to Beam using the standard API whenever >>>>>> possible. We also have plans to implement support for cross SDK >>>>>> transforms >>>>>> in the future (so that we can utilize Java implementation from Python >>>>>> for >>>>>> example) but we are not there yet and we might still want to >>>>>> implement a >>>>>> connector for a given SDK if there's good client library support. >>>>>> >>>>>> >>>>>> > It is great that the Python SDK will have connectors that are >>>>>> written in >>>>>> Python in the future, but I think it is equally if not more important >>>>>> to be >>>>>> able to use at least the Java Beam connectors with Python SDK (and any >>>>>> other non-Java SDK). Especially in a fully managed environment it >>>>>> should be >>>>>> possible to offer this to users in a way that is largely transparent. >>>>>> It >>>>>> takes significant time and effort to mature connectors and I'm not >>>>>> sure it >>>>>> is realistic to repeat that for all external systems in multiple >>>>>> languages. >>>>>> Or, to put it in another way, it is likely that instead of one over >>>>>> time >>>>>> rock solid connector per external system there will be multiple less >>>>>> mature >>>>>> implementations. That's also the reason we internally want to use the >>>>>> Flink >>>>>> native connectors - we know what they can and cannot do and want to >>>>>> leverage the existing investment. >>>>>> >>>>>> There are two related issues here: how to specify transforms (such as >>>>>> sources) in a language-independent manner, and how specific runners >>>>>> can >>>>>> recognize and run them, but URNs solve both. For this we use URNs: >>>>>> the >>>>>> composite ReadFromKafka PTransform (that consists of a Impulse + >>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that >>>>>> fully >>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and >>>>>> payload.) A runner that understands these URNs is free to make any >>>>>> (semantically-equivalent) substitutions it wants for this transform. >>>>>> >>>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn >>>>>> that >>>>>> fails loudly if it's actually called in the short term rather than a >>>>>> full >>>>>> Python implementation. Eventually, we would like to be able to call >>>>>> out to >>>>>> another SDK to expand full transforms (e.g. more complicated ones like >>>>>> BigQueryIO). >>>>>> >>>>>> >>> * Cross-language coders: In this example the Kafka source only >>>>>> considers the message value and uses the byte coder that both sides >>>>>> understand. If I wanted to pass on the key and possibly other >>>>>> metadata to >>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then >>>>>> a >>>>>> specific coder is needed. Such coder could be written using protobuf, >>>>>> Avro >>>>>> etc, but it would also need to be registered. >>>>>> >>>>>> >>>>>> >> I think this requirement goes away if we implement Kafka in Python >>>>>> SDK. >>>>>> >>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather >>>>>> any >>>>>> that isn't only using PCollection<byte[]>? Is there a language >>>>>> agnostic >>>>>> encoding for KV<?,?>, for example? >>>>>> >>>>>> Yes, Coders are also specified by URN (+components and/or payload), >>>>>> and >>>>>> there are a couple of standard ones, including KV. See >>>>>> >>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md >>>>>> This is not a closed set. >>>>>> >>>>>> - Robert >>>>>> >>>>>