On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> I agree with Thomas' sentiment that cross-language IO is very important
> because of how much work it takes to produce a mature connector
> implementation in a language. Looking at implementations of BigQueryIO,
> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
> to reimplement them entirely in Python and Go.
>
> I'm imagining pretty much what Kenn is describing: a pipeline would
> specify some transforms by URN + payload, and rely on the runner to do
> whatever it takes to run this - either by expanding it into a Beam
> implementation of this transform that the runner chooses to use (could be
> in the same language or in a different language; either way, the runner
> would indeed need to invoke the respective SDK to expand it given the
> parameters), or by doing something entirely runner-specific (e.g. using the
> built-in Flink Kafka connector).
>
> I don't see a reason to require that there *must* exist a Beam
> implementation of this transform. There only, ideally, must be a runner-
> and language-agnostic spec for the URN and payload; of course, then the
> transform is only as portable as the set of runners that implement this URN.
>

For a transform in general it's true that we don't need a Beam
implementation, but more specifically for IOs I think there are many
benefits to having the implementation in Beam. For example,

   - IO connector will offer same behavior and feature set across various
   runners/SDKs.
   - Beam community will be able to view/modify/improve the IO connector.
   - existing IO connectors will serve as examples for users who wish to
   develop new IO connectors



> I actually really like the idea that the transform can be implemented in a
> completely runner-specific way without a Beam expansion to back it up - it
> would let us unblock a lot of the work earlier than full-blown
> cross-language IO is delivered or even than SDFs work in all
> languages/runners.
>

If there are existing established connectors (for example, Kafka for Flink
in this case) I agree. But for anybody developing a new IO connector, I
think we should encourage developing that in Beam (in some SDK) given that
the connector will be available to all runners (and to all SDKs once we
have cross-language transforms).

Thanks,
Cham


>
> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <k...@google.com> wrote:
>
>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka connector,
>> right? I was imagining: Python SDK submits pipeline with a KafkaIO (with
>> URN + payload) maybe bogus contents. It is replaced with a small Flink
>> subgraph, including the native Flink Kafka connector and some compensating
>> transfoms to match the required semantics. To me, this is preferable to
>> making single-runner transform URNs, since that breaks runner portability
>> by definition.
>>
>> Kenn
>>
>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com> wrote:
>>>>
>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>> native override still needs to meet the spec - it may still require some
>>>>> compensating code. Worrying about weird differences between runners seems
>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>
>>>>
>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the SDF
>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>> topics. If the native Flink source doesn't work this way, then it doesn't
>>>> share the same spec and should have a different URN.
>>>>
>>>
>>> Agree that if they cannot share the same spec, SDF and native transforms
>>> warrant different URNs. Native Kafka might be able to support a PCollection
>>> of topics/partitions as an input though by utilizing underlying native
>>> Flink Kafka connector as a library. On the other hand, we might decide to
>>> expand SDF based ParDos into to other transforms before a runner gets a
>>> chance to override in which case this kind of replacements will not be
>>> possible.
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>>> Runners will already invoke the SDF differently, so users treating
>>>>> every detail of some implementation as the spec are doomed.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>> chamik...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>> DoFn that
>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>> than a full
>>>>>>>> > Python implementation.
>>>>>>>>
>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>> reasonable to use a URN + special data payload directly without a 
>>>>>>>> KafkaDoFn
>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink 
>>>>>>>> as
>>>>>>>> well. I agree that non-native alternative implementation is desirable, 
>>>>>>>> but
>>>>>>>> if one is not present we should IMO rather fail at job submission 
>>>>>>>> instead
>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution 
>>>>>>>> engine
>>>>>>>> where non-native implementations are not possible.
>>>>>>>>
>>>>>>>
>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a 
>>>>>>> map-task
>>>>>>> equivalent, for example) but a runner (Flink in this case) will be free 
>>>>>>> to
>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>> runner will have a chance to perform this override before the SDF 
>>>>>>> expansion
>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>> transforms for Flink native Kafka will be an option as well, but that 
>>>>>>> will
>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>
>>>>>>
>>>>>> Are we sure that the internal SDF will provide the same functionality
>>>>>> as the native one? What if the Kafka SDF is in the middle of a pipeline -
>>>>>> can Flink support that? Having a separate transform for the Flink native
>>>>>> source might be a better user experience than having one that changes its
>>>>>> behavior in strange ways depending on the runner.
>>>>>>
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>> rober...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> > Hi Cham,
>>>>>>>>>
>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>
>>>>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>>>>> that we
>>>>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>>>>> written
>>>>>>>>> in Java and we want to use the same connectors with the Python SDK
>>>>>>>>> on top
>>>>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>>>>> isn't a
>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>
>>>>>>>>
>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>> implementations of established source/sinks can be can be desirable in 
>>>>>>> some
>>>>>>> cases.
>>>>>>>
>>>>>>>
>>>>>>>>> > -->
>>>>>>>>>
>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>> > <chamik...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> >> Hi Thomas,
>>>>>>>>>
>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>
>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>>>>> source
>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>> Flink runner.
>>>>>>>>>
>>>>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>>>>> sources
>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>> you mean
>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use the
>>>>>>>>> same
>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > The native Flink sources as shown in the example below, not the
>>>>>>>>> Beam
>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>>>>> reading
>>>>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>> particular:
>>>>>>>>>
>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>> (custom
>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>> translations
>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>> option, ...)
>>>>>>>>>
>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>> recommended
>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>> reusable
>>>>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>>>>> does not
>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> The only standard unbounded source API offered by Python SDK is
>>>>>>>>> the
>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying
>>>>>>>>> to add a
>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>> comparing
>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>> information on
>>>>>>>>> this soon.
>>>>>>>>>
>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>> might want
>>>>>>>>> to consider adding a native source/sink implementations. But this
>>>>>>>>> will
>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>> will have
>>>>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>>>>> try to
>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>> whenever
>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>> transforms
>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>> Python for
>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>> implement a
>>>>>>>>> connector for a given SDK if there's good client library support.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>> written in
>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>> important to be
>>>>>>>>> able to use at least the Java Beam connectors with Python SDK (and
>>>>>>>>> any
>>>>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>>>>> should be
>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>> transparent. It
>>>>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>>>>> sure it
>>>>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>>>>> languages.
>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>> over time
>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>> less mature
>>>>>>>>> implementations. That's also the reason we internally want to use
>>>>>>>>> the Flink
>>>>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>>>>> leverage the existing investment.
>>>>>>>>>
>>>>>>>>> There are two related issues here: how to specify transforms (such
>>>>>>>>> as
>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>> runners can
>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>> URNs: the
>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>>>>> fully
>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>> transform.
>>>>>>>>>
>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a
>>>>>>>>> DoFn that
>>>>>>>>> fails loudly if it's actually called in the short term rather than
>>>>>>>>> a full
>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>> call out to
>>>>>>>>> another SDK to expand full transforms (e.g. more complicated ones
>>>>>>>>> like
>>>>>>>>> BigQueryIO).
>>>>>>>>>
>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>>>>> considers the message value and uses the byte coder that both sides
>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>> metadata to
>>>>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO),
>>>>>>>>> then a
>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>> protobuf, Avro
>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>> Python SDK.
>>>>>>>>>
>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>> rather any
>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>> agnostic
>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>
>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>> payload), and
>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>
>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>> This is not a closed set.
>>>>>>>>>
>>>>>>>>> - Robert
>>>>>>>>>
>>>>>>>>

Reply via email to