The premise of URN + payload is that you can establish a spec. A native
override still needs to meet the spec - it may still require some
compensating code. Worrying about weird differences between runners seems
more about worrying that an adequate spec cannot be determined.

Runners will already invoke the SDF differently, so users treating every
detail of some implementation as the spec are doomed.

Kenn

On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:

>
>
> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com> wrote:
>>
>>> > Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>> that
>>> > fails loudly if it's actually called in the short term rather than a
>>> full
>>> > Python implementation.
>>>
>>> For configurable runner-native IO, for now, I think it is reasonable to
>>> use a URN + special data payload directly without a KafkaDoFn -- assuming
>>> it's a portable pipeline. That's what we do in Go for PubSub-on-Dataflow
>>> and something similar would work for Kafka-on-Flink as well. I agree that
>>> non-native alternative implementation is desirable, but if one is not
>>> present we should IMO rather fail at job submission instead of at runtime.
>>> I could imagine connectors intrinsic to an execution engine where
>>> non-native implementations are not possible.
>>>
>>
>> I think, in this case, KafkaDoFn can be a SDF that would expand similar
>> to any other SDF by default (initial splitting, GBK, and a map-task
>> equivalent, for example) but a runner (Flink in this case) will be free to
>> override it with an runner-native implementation if desired. I assume
>> runner will have a chance to perform this override before the SDF expansion
>> (which is not fully designed yet). Providing a separate source/sink
>> transforms for Flink native Kafka will be an option as well, but that will
>> be less desirable from a Python user API perspective.
>>
>
> Are we sure that the internal SDF will provide the same functionality as
> the native one? What if the Kafka SDF is in the middle of a pipeline - can
> Flink support that? Having a separate transform for the Flink native source
> might be a better user experience than having one that changes its behavior
> in strange ways depending on the runner.
>
>
>
>>
>>
>>>
>>>
>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> wrote:
>>>>
>>>> > Hi Cham,
>>>>
>>>> > Thanks for the feedback!
>>>>
>>>> > I should have probably clarified that my POC and questions aren't
>>>> specific to Kafka as source, but pretty much any other source/sink that
>>>> we
>>>> internally use as well. We have existing Flink pipelines that are
>>>> written
>>>> in Java and we want to use the same connectors with the Python SDK on
>>>> top
>>>> of the already operationalized Flink stack. Therefore, portability
>>>> isn't a
>>>> concern as much as the ability to integrate is.
>>>>
>>>
>> Thanks for the clarification. Agree that providing runner-native
>> implementations of established source/sinks can be can be desirable in some
>> cases.
>>
>>
>>>> > -->
>>>>
>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>> > <chamik...@google.com>
>>>> wrote:
>>>>
>>>> >> Hi Thomas,
>>>>
>>>> >> Seems like we are working on similar (partially) things :).
>>>>
>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org> wrote:
>>>>
>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming source
>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>> runner.
>>>>
>>>> >>> We eventually want to use the same native Flink connectors for
>>>> sources
>>>> and sinks that we also use in other Flink jobs.
>>>>
>>>>
>>>> >> Could you clarify what you mean by same Flink connector ? Do you mean
>>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>>> Kafka connector implementation ?
>>>>
>>>>
>>>> > The native Flink sources as shown in the example below, not the Beam
>>>> KafkaIO or other Beam sources.
>>>>
>>>>
>>>>
>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>> reading
>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>
>>>>
>>>>
>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>
>>>>
>>>>
>>>> >>> I'm looking for feedback/opinions on the following items in
>>>> particular:
>>>>
>>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>>> translator could be loaded with ServiceLoader, additional translations
>>>> could also be specified as job server configuration, pipeline option,
>>>> ...)
>>>>
>>>> >>> * For the Python side, is what's shown in the commit the recommended
>>>> way to define a custom transform (it would eventually live in a reusable
>>>> custom module that pipeline authors can import)? Also, the example does
>>>> not
>>>> have the configuration part covered yet..
>>>>
>>>>
>>>> >> The only standard unbounded source API offered by Python SDK is the
>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to add
>>>> a
>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>> comparing
>>>> different Kafka Python client libraries. Will share more information on
>>>> this soon.
>>>>
>>>> >> I understand this might not be possible in all cases and we might
>>>> want
>>>> to consider adding a native source/sink implementations. But this will
>>>> result in the implementation being runner-specific (each runner will
>>>> have
>>>> to have it's own source/sink implementation). So I think we should try
>>>> to
>>>> add connector implementations to Beam using the standard API whenever
>>>> possible. We also have plans to implement support for cross SDK
>>>> transforms
>>>> in the future (so that we can utilize Java implementation from Python
>>>> for
>>>> example) but we are not there yet and we might still want to implement a
>>>> connector for a given SDK if there's good client library support.
>>>>
>>>>
>>>> > It is great that the Python SDK will have connectors that are written
>>>> in
>>>> Python in the future, but I think it is equally if not more important
>>>> to be
>>>> able to use at least the Java Beam connectors with Python SDK (and any
>>>> other non-Java SDK). Especially in a fully managed environment it
>>>> should be
>>>> possible to offer this to users in a way that is largely transparent. It
>>>> takes significant time and effort to mature connectors and I'm not sure
>>>> it
>>>> is realistic to repeat that for all external systems in multiple
>>>> languages.
>>>> Or, to put it in another way, it is likely that instead of one over time
>>>> rock solid connector per external system there will be multiple less
>>>> mature
>>>> implementations. That's also the reason we internally want to use the
>>>> Flink
>>>> native connectors - we know what they can and cannot do and want to
>>>> leverage the existing investment.
>>>>
>>>> There are two related issues here: how to specify transforms (such as
>>>> sources) in a language-independent manner, and how specific runners can
>>>> recognize and run them, but URNs solve both. For  this we use URNs: the
>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>> payload.) A runner that understands these URNs is free to make any
>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>
>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>>> that
>>>> fails loudly if it's actually called in the short term rather than a
>>>> full
>>>> Python implementation. Eventually, we would like to be able to call out
>>>> to
>>>> another SDK to expand full transforms (e.g. more complicated ones like
>>>> BigQueryIO).
>>>>
>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>> considers the message value and uses the byte coder that both sides
>>>> understand. If I wanted to pass on the key and possibly other metadata
>>>> to
>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then a
>>>> specific coder is needed. Such coder could be written using protobuf,
>>>> Avro
>>>> etc, but it would also need to be registered.
>>>>
>>>>
>>>> >> I think this requirement goes away if we implement Kafka in Python
>>>> SDK.
>>>>
>>>> > Wouldn't this be needed for any cross language pipeline? Or rather any
>>>> that isn't only using PCollection<byte[]>? Is there a language agnostic
>>>> encoding for KV<?,?>, for example?
>>>>
>>>> Yes, Coders are also specified by URN (+components and/or payload), and
>>>> there are a couple of standard ones, including KV. See
>>>>
>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>> This is not a closed set.
>>>>
>>>> - Robert
>>>>
>>>

Reply via email to