On Fri, Apr 27, 2018 at 12:18 PM Thomas Weise <t...@apache.org> wrote:

>
> The ability to specify with URN and implement custom transforms is also
> important. Such transforms may not qualify for inclusion in Beam for a
> variety of reasons (only relevant for a specific environment or use case,
> dependencies/licensing, ...).
>

They don't need to be included in Beam - by design, a third party library
transform can specify its own URN and Payload to be put in the proto
representation. I'm not sure of the state of the code here, but I think the
current path is a shared dep on runners-core-construction and some
ServiceLoader shenanigans. Shading may be in place that breaks this.

Kenn




> For my specific experiment, I prefer the custom URN over trying to bend
> the implementation to mimic an SDF based KafkaIO that it wouldn't (and
> doesn't need to) be semantically equivalent to. At this point Beam
> doesn't have the spec and implementation for said KafkaIO, but it would be
> great to see an example how it would look like. Following a Beam spec
> would absolutely make sense if the custom implementation is purely for
> optimization or similar purpose.
>
> I wanted to circle back to the coder related question. I see that we now
> have a proto definition for the standard transforms and coders, which is
> really nice:
>
>
> https://github.com/apache/beam/blob/42fac771814b119c162d40e9300f5a0d3afe0f48/model/pipeline/src/main/proto/beam_runner_api.proto#L521
>
> This enables interoperability between languages with some standard types
> (KV, ITERABLE etc.), but for a structure like KafkaRecord a custom coder
> would be required, implemented in both Java and Python. Any thoughts on
> providing a generic tuple/record coder as part of the spec?
>
> Thanks,
> Thomas
>
>
>
> On Fri, Apr 27, 2018 at 8:53 AM, Lukasz Cwik <lc...@google.com> wrote:
>
>>
>>
>> On Thu, Apr 26, 2018 at 8:38 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Thu, Apr 26, 2018 at 5:59 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> I agree with Thomas' sentiment that cross-language IO is very important
>>>> because of how much work it takes to produce a mature connector
>>>> implementation in a language. Looking at implementations of BigQueryIO,
>>>> PubSubIO, KafkaIO, FileIO in Java, only a very daring soul would be tempted
>>>> to reimplement them entirely in Python and Go.
>>>>
>>>> I'm imagining pretty much what Kenn is describing: a pipeline would
>>>> specify some transforms by URN + payload, and rely on the runner to do
>>>> whatever it takes to run this - either by expanding it into a Beam
>>>> implementation of this transform that the runner chooses to use (could be
>>>> in the same language or in a different language; either way, the runner
>>>> would indeed need to invoke the respective SDK to expand it given the
>>>> parameters), or by doing something entirely runner-specific (e.g. using the
>>>> built-in Flink Kafka connector).
>>>>
>>>> I don't see a reason to require that there *must* exist a Beam
>>>> implementation of this transform. There only, ideally, must be a runner-
>>>> and language-agnostic spec for the URN and payload; of course, then the
>>>> transform is only as portable as the set of runners that implement this 
>>>> URN.
>>>>
>>>
>>> For a transform in general it's true that we don't need a Beam
>>> implementation, but more specifically for IOs I think there are many
>>> benefits to having the implementation in Beam. For example,
>>>
>>>    - IO connector will offer same behavior and feature set across
>>>    various runners/SDKs.
>>>    - Beam community will be able to view/modify/improve the IO
>>>    connector.
>>>    - existing IO connectors will serve as examples for users who wish
>>>    to develop new IO connectors
>>>
>>>
>>>
>>    - More runners will be able to execute the users pipeline.
>>
>>
>>>> I actually really like the idea that the transform can be implemented
>>>> in a completely runner-specific way without a Beam expansion to back it up
>>>> - it would let us unblock a lot of the work earlier than full-blown
>>>> cross-language IO is delivered or even than SDFs work in all
>>>> languages/runners.
>>>>
>>>
>>> If there are existing established connectors (for example, Kafka for
>>> Flink in this case) I agree. But for anybody developing a new IO connector,
>>> I think we should encourage developing that in Beam (in some SDK) given
>>> that the connector will be available to all runners (and to all SDKs once
>>> we have cross-language transforms).
>>>
>>> Thanks,
>>> Cham
>>>
>>>
>>>>
>>>> On Wed, Apr 25, 2018 at 10:02 PM Kenneth Knowles <k...@google.com>
>>>> wrote:
>>>>
>>>>> It doesn't have to be 1:1 swapping KafkaIO for a Flink Kafka
>>>>> connector, right? I was imagining: Python SDK submits pipeline with a
>>>>> KafkaIO (with URN + payload) maybe bogus contents. It is replaced with a
>>>>> small Flink subgraph, including the native Flink Kafka connector and some
>>>>> compensating transfoms to match the required semantics. To me, this is
>>>>> preferable to making single-runner transform URNs, since that breaks 
>>>>> runner
>>>>> portability by definition.
>>>>>
>>>>> Kenn
>>>>>
>>>>> On Wed, Apr 25, 2018 at 7:40 PM Chamikara Jayalath <
>>>>> chamik...@google.com> wrote:
>>>>>
>>>>>>
>>>>>>
>>>>>> On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> The premise of URN + payload is that you can establish a spec. A
>>>>>>>> native override still needs to meet the spec - it may still require 
>>>>>>>> some
>>>>>>>> compensating code. Worrying about weird differences between runners 
>>>>>>>> seems
>>>>>>>> more about worrying that an adequate spec cannot be determined.
>>>>>>>>
>>>>>>>
>>>>>>> My point exactly. a SDF-based KafkaIO can run in the middle of a
>>>>>>> pipeline. E.g. we could have TextIO producing a list of topics, and the 
>>>>>>> SDF
>>>>>>> KafkaIO run after that on this dynamic (not known until runtime) list of
>>>>>>> topics. If the native Flink source doesn't work this way, then it 
>>>>>>> doesn't
>>>>>>> share the same spec and should have a different URN.
>>>>>>>
>>>>>>
>>>>>> Agree that if they cannot share the same spec, SDF and native
>>>>>> transforms warrant different URNs. Native Kafka might be able to support 
>>>>>> a
>>>>>> PCollection of topics/partitions as an input though by utilizing 
>>>>>> underlying
>>>>>> native Flink Kafka connector as a library. On the other hand, we might
>>>>>> decide to expand SDF based ParDos into to other transforms before a 
>>>>>> runner
>>>>>> gets a chance to override in which case this kind of replacements will 
>>>>>> not
>>>>>> be possible.
>>>>>>
>>>>>> Thanks,
>>>>>> Cham
>>>>>>
>>>>>>
>>>>>>>
>>>>>>>> Runners will already invoke the SDF differently, so users treating
>>>>>>>> every detail of some implementation as the spec are doomed.
>>>>>>>>
>>>>>>>> Kenn
>>>>>>>>
>>>>>>>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <
>>>>>>>>> chamik...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> > Note that a KafkaDoFn still needs to be provided, but could
>>>>>>>>>>> be a DoFn that
>>>>>>>>>>> > fails loudly if it's actually called in the short term rather
>>>>>>>>>>> than a full
>>>>>>>>>>> > Python implementation.
>>>>>>>>>>>
>>>>>>>>>>> For configurable runner-native IO, for now, I think it is
>>>>>>>>>>> reasonable to use a URN + special data payload directly without a 
>>>>>>>>>>> KafkaDoFn
>>>>>>>>>>> -- assuming it's a portable pipeline. That's what we do in Go for
>>>>>>>>>>> PubSub-on-Dataflow and something similar would work for 
>>>>>>>>>>> Kafka-on-Flink as
>>>>>>>>>>> well. I agree that non-native alternative implementation is 
>>>>>>>>>>> desirable, but
>>>>>>>>>>> if one is not present we should IMO rather fail at job submission 
>>>>>>>>>>> instead
>>>>>>>>>>> of at runtime. I could imagine connectors intrinsic to an execution 
>>>>>>>>>>> engine
>>>>>>>>>>> where non-native implementations are not possible.
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> I think, in this case, KafkaDoFn can be a SDF that would expand
>>>>>>>>>> similar to any other SDF by default (initial splitting, GBK, and a 
>>>>>>>>>> map-task
>>>>>>>>>> equivalent, for example) but a runner (Flink in this case) will be 
>>>>>>>>>> free to
>>>>>>>>>> override it with an runner-native implementation if desired. I assume
>>>>>>>>>> runner will have a chance to perform this override before the SDF 
>>>>>>>>>> expansion
>>>>>>>>>> (which is not fully designed yet). Providing a separate source/sink
>>>>>>>>>> transforms for Flink native Kafka will be an option as well, but 
>>>>>>>>>> that will
>>>>>>>>>> be less desirable from a Python user API perspective.
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Are we sure that the internal SDF will provide the same
>>>>>>>>> functionality as the native one? What if the Kafka SDF is in the 
>>>>>>>>> middle of
>>>>>>>>> a pipeline - can Flink support that? Having a separate transform for 
>>>>>>>>> the
>>>>>>>>> Flink native source might be a better user experience than having one 
>>>>>>>>> that
>>>>>>>>> changes its behavior in strange ways depending on the runner.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <
>>>>>>>>>>> rober...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> > Hi Cham,
>>>>>>>>>>>>
>>>>>>>>>>>> > Thanks for the feedback!
>>>>>>>>>>>>
>>>>>>>>>>>> > I should have probably clarified that my POC and questions
>>>>>>>>>>>> aren't
>>>>>>>>>>>> specific to Kafka as source, but pretty much any other
>>>>>>>>>>>> source/sink that we
>>>>>>>>>>>> internally use as well. We have existing Flink pipelines that
>>>>>>>>>>>> are written
>>>>>>>>>>>> in Java and we want to use the same connectors with the Python
>>>>>>>>>>>> SDK on top
>>>>>>>>>>>> of the already operationalized Flink stack. Therefore,
>>>>>>>>>>>> portability isn't a
>>>>>>>>>>>> concern as much as the ability to integrate is.
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Thanks for the clarification. Agree that providing runner-native
>>>>>>>>>> implementations of established source/sinks can be can be desirable 
>>>>>>>>>> in some
>>>>>>>>>> cases.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>> > -->
>>>>>>>>>>>>
>>>>>>>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>>>>>>>> > <chamik...@google.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >> Hi Thomas,
>>>>>>>>>>>>
>>>>>>>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>>>>>>>
>>>>>>>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I'm working on a mini POC to enable Kafka as custom
>>>>>>>>>>>> streaming source
>>>>>>>>>>>> for a Python pipeline executing on the (in-progress) portable
>>>>>>>>>>>> Flink runner.
>>>>>>>>>>>>
>>>>>>>>>>>> >>> We eventually want to use the same native Flink connectors
>>>>>>>>>>>> for sources
>>>>>>>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> Could you clarify what you mean by same Flink connector ? Do
>>>>>>>>>>>> you mean
>>>>>>>>>>>> that Beam-based and non-Beam-based versions of Flink will use
>>>>>>>>>>>> the same
>>>>>>>>>>>> Kafka connector implementation ?
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > The native Flink sources as shown in the example below, not
>>>>>>>>>>>> the Beam
>>>>>>>>>>>> KafkaIO or other Beam sources.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I got a simple example to work with the
>>>>>>>>>>>> FlinkKafkaConsumer010 reading
>>>>>>>>>>>> from Kafka and a Python lambda logging the value. The code is
>>>>>>>>>>>> here:
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>>>>>>>> particular:
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * Enabling custom translation on the Flink portable runner
>>>>>>>>>>>> (custom
>>>>>>>>>>>> translator could be loaded with ServiceLoader, additional
>>>>>>>>>>>> translations
>>>>>>>>>>>> could also be specified as job server configuration, pipeline
>>>>>>>>>>>> option, ...)
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>>>>>>>> recommended
>>>>>>>>>>>> way to define a custom transform (it would eventually live in a
>>>>>>>>>>>> reusable
>>>>>>>>>>>> custom module that pipeline authors can import)? Also, the
>>>>>>>>>>>> example does not
>>>>>>>>>>>> have the configuration part covered yet..
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> The only standard unbounded source API offered by Python SDK
>>>>>>>>>>>> is the
>>>>>>>>>>>> Splittable DoFn API. This is the part I'm working on. I'm
>>>>>>>>>>>> trying to add a
>>>>>>>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>>>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>>>>>>>> comparing
>>>>>>>>>>>> different Kafka Python client libraries. Will share more
>>>>>>>>>>>> information on
>>>>>>>>>>>> this soon.
>>>>>>>>>>>>
>>>>>>>>>>>> >> I understand this might not be possible in all cases and we
>>>>>>>>>>>> might want
>>>>>>>>>>>> to consider adding a native source/sink implementations. But
>>>>>>>>>>>> this will
>>>>>>>>>>>> result in the implementation being runner-specific (each runner
>>>>>>>>>>>> will have
>>>>>>>>>>>> to have it's own source/sink implementation). So I think we
>>>>>>>>>>>> should try to
>>>>>>>>>>>> add connector implementations to Beam using the standard API
>>>>>>>>>>>> whenever
>>>>>>>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>>>>>>>> transforms
>>>>>>>>>>>> in the future (so that we can utilize Java implementation from
>>>>>>>>>>>> Python for
>>>>>>>>>>>> example) but we are not there yet and we might still want to
>>>>>>>>>>>> implement a
>>>>>>>>>>>> connector for a given SDK if there's good client library
>>>>>>>>>>>> support.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>>>>>>>> written in
>>>>>>>>>>>> Python in the future, but I think it is equally if not more
>>>>>>>>>>>> important to be
>>>>>>>>>>>> able to use at least the Java Beam connectors with Python SDK
>>>>>>>>>>>> (and any
>>>>>>>>>>>> other non-Java SDK). Especially in a fully managed environment
>>>>>>>>>>>> it should be
>>>>>>>>>>>> possible to offer this to users in a way that is largely
>>>>>>>>>>>> transparent. It
>>>>>>>>>>>> takes significant time and effort to mature connectors and I'm
>>>>>>>>>>>> not sure it
>>>>>>>>>>>> is realistic to repeat that for all external systems in
>>>>>>>>>>>> multiple languages.
>>>>>>>>>>>> Or, to put it in another way, it is likely that instead of one
>>>>>>>>>>>> over time
>>>>>>>>>>>> rock solid connector per external system there will be multiple
>>>>>>>>>>>> less mature
>>>>>>>>>>>> implementations. That's also the reason we internally want to
>>>>>>>>>>>> use the Flink
>>>>>>>>>>>> native connectors - we know what they can and cannot do and
>>>>>>>>>>>> want to
>>>>>>>>>>>> leverage the existing investment.
>>>>>>>>>>>>
>>>>>>>>>>>> There are two related issues here: how to specify transforms
>>>>>>>>>>>> (such as
>>>>>>>>>>>> sources) in a language-independent manner, and how specific
>>>>>>>>>>>> runners can
>>>>>>>>>>>> recognize and run them, but URNs solve both. For  this we use
>>>>>>>>>>>> URNs: the
>>>>>>>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>>>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload
>>>>>>>>>>>> that fully
>>>>>>>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN
>>>>>>>>>>>> and
>>>>>>>>>>>> payload.) A runner that understands these URNs is free to make
>>>>>>>>>>>> any
>>>>>>>>>>>> (semantically-equivalent) substitutions it wants for this
>>>>>>>>>>>> transform.
>>>>>>>>>>>>
>>>>>>>>>>>> Note that a KafkaDoFn still needs to be provided, but could be
>>>>>>>>>>>> a DoFn that
>>>>>>>>>>>> fails loudly if it's actually called in the short term rather
>>>>>>>>>>>> than a full
>>>>>>>>>>>> Python implementation. Eventually, we would like to be able to
>>>>>>>>>>>> call out to
>>>>>>>>>>>> another SDK to expand full transforms (e.g. more complicated
>>>>>>>>>>>> ones like
>>>>>>>>>>>> BigQueryIO).
>>>>>>>>>>>>
>>>>>>>>>>>> >>> * Cross-language coders: In this example the Kafka source
>>>>>>>>>>>> only
>>>>>>>>>>>> considers the message value and uses the byte coder that both
>>>>>>>>>>>> sides
>>>>>>>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>>>>>>>> metadata to
>>>>>>>>>>>> the Python transform (similar to KafkaRecord from Java
>>>>>>>>>>>> KafkaIO), then a
>>>>>>>>>>>> specific coder is needed. Such coder could be written using
>>>>>>>>>>>> protobuf, Avro
>>>>>>>>>>>> etc, but it would also need to be registered.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> >> I think this requirement goes away if we implement Kafka in
>>>>>>>>>>>> Python SDK.
>>>>>>>>>>>>
>>>>>>>>>>>> > Wouldn't this be needed for any cross language pipeline? Or
>>>>>>>>>>>> rather any
>>>>>>>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>>>>>>>> agnostic
>>>>>>>>>>>> encoding for KV<?,?>, for example?
>>>>>>>>>>>>
>>>>>>>>>>>> Yes, Coders are also specified by URN (+components and/or
>>>>>>>>>>>> payload), and
>>>>>>>>>>>> there are a couple of standard ones, including KV. See
>>>>>>>>>>>>
>>>>>>>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>>>>>>>> This is not a closed set.
>>>>>>>>>>>>
>>>>>>>>>>>> - Robert
>>>>>>>>>>>>
>>>>>>>>>>>
>

Reply via email to