Although I suspect/hope that sharing IO connectors across SDKs will
adequately cover the lion's share of implementations (especially the long
tail), I also think it's a case-by-case decision to make. Native IO might
be preferable for some uses and each SDK will want IO implementations where
they shine or at least for reference. I think of these options as
complementary.

For cross-language IO connectors that uses user functions in an intimate
way, to Reuven's point, the IO connector will have to be implemented in a
way that makes each user function a transform so that it can be supplied
and executed in the user's SDK. The current practice of embedding user
functions in DoFns won't work. This will require more fusion breaks (and
coding of data) than otherwise needed and could be a performance penalty,
unless the IO connector can be written in a way that avoids the user
function as Kenn suggests.

Small +1 to Kenn's idea of auditing the existing IO connectors to get a
sense of which IO might be problematic. However, it might be a tad
premature to do too much until the cross-language transform feature is
fleshed out further.

Henning


On Mon, Apr 30, 2018 at 9:54 AM Kenneth Knowles <k...@google.com> wrote:

> I agree with Cham's motivations as far as "we need it now" and getting
> Python SDF up and running and exercised on a real connector.
>
> But I do find the current API of BigQueryIO to be a poor example. That
> particular functionality on BigQueryIO seems extraneous and goes against
> our own style guide [1]. The recommended way to write it would be for
> BigQueryIO to output a natural concrete type (like TableRow) and allow the
> following step to do conversions. This is a broader programming best
> practice - unless there is a compelling reason, you should just return the
> value rather than accept a higher-order function to operate on the value.
> Is there a compelling reason in this case? I just dug through the code and
> just see that it bottoms out in AvroSource where it does not seem to add
> functionality.
>
> Considering cross-language pipelines as a primary use case for all
> connectors, perhaps we should audit them and bring them into alignment now,
> deprecating paths using higher-order functions. We can still consider
> host-language convenience composites.
>
> For an unbounded source like KafkaIO the compelling reason is the
> timestamp extracting function to be able to maintain a watermark. Notably,
> PubsubIO does not accept such a function, but requires the timestamp to be
> in a metadata field that any language can describe (versus having to parse
> the message to pull out the timestamp).
>
> Kenn
>
> [1]
> https://beam.apache.org/contribute/ptransform-style-guide/#choosing-types-of-input-and-output-pcollections
>
> On Mon, Apr 30, 2018 at 9:27 AM Reuven Lax <re...@google.com> wrote:
>
>> Another point: cross-language IOs might add a performance penalty in many
>> cases. For an example of this look at BigQueryIO. The user can register a
>> SerializableFunction that is evaluated on every record, and determines
>> which destination to write the record to. Now a Python user would want to
>> register a Python function for this of course. this means that the Java IO
>> would have to invoke Python code for each record it sees, which will likely
>> be a big performance hit.
>>
>> Of course the downside of duplicating IOs is exactly as you say -
>> multiple versions to maintain, and potentially duplicate bugs. I think the
>> right answer will need to be on a case-by-case basis.
>>
>> Reuven
>>
>> On Mon, Apr 30, 2018 at 8:05 AM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>> Hi Aljoscha,
>>>
>>> I tried to cover this in the doc. Once we have full support for
>>> cross-language IO, we can decide this on a case-by-case basis. But I don't
>>> think we should cease defining new sources/sinks for Beam Python SDK till
>>> we get to that point. I think there are good reasons for adding Kafka
>>> support for Python today and many Beam users have request this. Also, note
>>> that proposed Python Kafka source will be based on the Splittable DoFn
>>> framework while the current Java version is based on the UnboundedSource
>>> framework. Here are the reasons that are currently listed in the doc.
>>>
>>>
>>>    -
>>>
>>>    Users might find it useful to have at least one unbounded source and
>>>    sink combination implemented in Python SDK and Kafka is the streaming
>>>    system that makes most sense to support if we just want to add support 
>>> for
>>>    only one such system in Python SDK.
>>>    -
>>>
>>>    Not all runners might support cross-language IO. Also some
>>>    user/runner/deployment combinations might require an unbounded 
>>> source/sink
>>>    implemented in Python SDK.
>>>    -
>>>
>>>    We recently added Splittable DoFn support to Python SDK. It will be
>>>    good to have at least one production quality Splittable DoFn that
>>>    will server as a good example for any users who wish to implement new
>>>    Splittable DoFn implementations on top of Beam Python SDK.
>>>    -
>>>
>>>    Cross-language transform feature is currently is in the initial
>>>    discussion phase and it could be some time before we can offer existing
>>>    Java implementation of Kafka for Python SDK users.
>>>    -
>>>
>>>    Cross-language IO might take even longer to reach the point where
>>>    it's fully equivalent in expressive power to a transform written in the
>>>    host language - e.g. supporting host-language lambdas as part of the
>>>    transform configuration is likely to take a lot longer than "first-order"
>>>    cross-language IO. KafkaIO in Java uses lambdas as part of transform
>>>    configuration, e.g. timestamp functions.
>>>
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Mon, Apr 30, 2018 at 2:14 AM Aljoscha Krettek <aljos...@apache.org>
>>> wrote:
>>>
>>>> Is this what we want to do in the long run, i.e. implement copies of
>>>> connectors for different SDKs? I thought the plan was to enable using
>>>> connectors written in different languages, i.e. use the Java Kafka I/O from
>>>> python. This way we wouldn't duplicate bugs for three different language
>>>> (Java, Python, and Go for now).
>>>>
>>>> Best,
>>>> Aljoscha
>>>>
>>>>
>>>> On 29. Apr 2018, at 20:46, Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>> Thanks Cham, this is great! I left just a couple of comments on the doc.
>>>>
>>>> On Fri, Apr 27, 2018 at 10:06 PM Chamikara Jayalath <
>>>> chamik...@google.com> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I'm looking into adding a Kafka connector to Beam Python SDK. I think
>>>>> this will benefits many Python SDK users and will serve as a good example
>>>>> for recently added Splittable DoFn API (Fn API support which will allow 
>>>>> all
>>>>> runners to use Python Splittable DoFn is in active development).  I 
>>>>> created
>>>>> a document [1] that makes the case for adding this connector and compares
>>>>> the performance of available Python Kafka client libraries. Also I created
>>>>> a POC [2] that illustrates the API and how Python SDF API can be used to
>>>>> implement a Kafka source. I extremely appreciate any feedback related to
>>>>> this.
>>>>>
>>>>> [1]
>>>>> https://docs.google.com/document/d/1ogRS-e-HYYTHsXi_l2zDUUOnvfzEbub3BFkPrYIOawU/edit?usp=sharing
>>>>> [2]
>>>>> https://github.com/chamikaramj/beam/commit/982767b69198579b22522de6794242142d12c5f9
>>>>>
>>>>> Thanks,
>>>>> Cham
>>>>>
>>>>
>>>>

Reply via email to