On Wed, Apr 25, 2018 at 6:57 PM Reuven Lax <re...@google.com> wrote:

> On Wed, Apr 25, 2018 at 6:51 PM Kenneth Knowles <k...@google.com> wrote:
>
>> The premise of URN + payload is that you can establish a spec. A native
>> override still needs to meet the spec - it may still require some
>> compensating code. Worrying about weird differences between runners seems
>> more about worrying that an adequate spec cannot be determined.
>>
>
> My point exactly. a SDF-based KafkaIO can run in the middle of a pipeline.
> E.g. we could have TextIO producing a list of topics, and the SDF KafkaIO
> run after that on this dynamic (not known until runtime) list of topics. If
> the native Flink source doesn't work this way, then it doesn't share the
> same spec and should have a different URN.
>

Agree that if they cannot share the same spec, SDF and native transforms
warrant different URNs. Native Kafka might be able to support a PCollection
of topics/partitions as an input though by utilizing underlying native
Flink Kafka connector as a library. On the other hand, we might decide to
expand SDF based ParDos into to other transforms before a runner gets a
chance to override in which case this kind of replacements will not be
possible.

Thanks,
Cham


>
>> Runners will already invoke the SDF differently, so users treating every
>> detail of some implementation as the spec are doomed.
>>
>> Kenn
>>
>> On Wed, Apr 25, 2018, 17:04 Reuven Lax <re...@google.com> wrote:
>>
>>>
>>>
>>> On Tue, Apr 24, 2018 at 5:52 PM Chamikara Jayalath <chamik...@google.com>
>>> wrote:
>>>
>>>>
>>>>
>>>> On Tue, Apr 24, 2018 at 3:44 PM Henning Rohde <hero...@google.com>
>>>> wrote:
>>>>
>>>>> > Note that a KafkaDoFn still needs to be provided, but could be a
>>>>> DoFn that
>>>>> > fails loudly if it's actually called in the short term rather than a
>>>>> full
>>>>> > Python implementation.
>>>>>
>>>>> For configurable runner-native IO, for now, I think it is reasonable
>>>>> to use a URN + special data payload directly without a KafkaDoFn --
>>>>> assuming it's a portable pipeline. That's what we do in Go for
>>>>> PubSub-on-Dataflow and something similar would work for Kafka-on-Flink as
>>>>> well. I agree that non-native alternative implementation is desirable, but
>>>>> if one is not present we should IMO rather fail at job submission instead
>>>>> of at runtime. I could imagine connectors intrinsic to an execution engine
>>>>> where non-native implementations are not possible.
>>>>>
>>>>
>>>> I think, in this case, KafkaDoFn can be a SDF that would expand similar
>>>> to any other SDF by default (initial splitting, GBK, and a map-task
>>>> equivalent, for example) but a runner (Flink in this case) will be free to
>>>> override it with an runner-native implementation if desired. I assume
>>>> runner will have a chance to perform this override before the SDF expansion
>>>> (which is not fully designed yet). Providing a separate source/sink
>>>> transforms for Flink native Kafka will be an option as well, but that will
>>>> be less desirable from a Python user API perspective.
>>>>
>>>
>>> Are we sure that the internal SDF will provide the same functionality as
>>> the native one? What if the Kafka SDF is in the middle of a pipeline - can
>>> Flink support that? Having a separate transform for the Flink native source
>>> might be a better user experience than having one that changes its behavior
>>> in strange ways depending on the runner.
>>>
>>>
>>>
>>>>
>>>>
>>>>>
>>>>>
>>>>> On Tue, Apr 24, 2018 at 3:09 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>> > Hi Cham,
>>>>>>
>>>>>> > Thanks for the feedback!
>>>>>>
>>>>>> > I should have probably clarified that my POC and questions aren't
>>>>>> specific to Kafka as source, but pretty much any other source/sink
>>>>>> that we
>>>>>> internally use as well. We have existing Flink pipelines that are
>>>>>> written
>>>>>> in Java and we want to use the same connectors with the Python SDK on
>>>>>> top
>>>>>> of the already operationalized Flink stack. Therefore, portability
>>>>>> isn't a
>>>>>> concern as much as the ability to integrate is.
>>>>>>
>>>>>
>>>> Thanks for the clarification. Agree that providing runner-native
>>>> implementations of established source/sinks can be can be desirable in some
>>>> cases.
>>>>
>>>>
>>>>>> > -->
>>>>>>
>>>>>> > On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
>>>>>> > <chamik...@google.com>
>>>>>> wrote:
>>>>>>
>>>>>> >> Hi Thomas,
>>>>>>
>>>>>> >> Seems like we are working on similar (partially) things :).
>>>>>>
>>>>>> >> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org>
>>>>>> wrote:
>>>>>>
>>>>>> >>> I'm working on a mini POC to enable Kafka as custom streaming
>>>>>> source
>>>>>> for a Python pipeline executing on the (in-progress) portable Flink
>>>>>> runner.
>>>>>>
>>>>>> >>> We eventually want to use the same native Flink connectors for
>>>>>> sources
>>>>>> and sinks that we also use in other Flink jobs.
>>>>>>
>>>>>>
>>>>>> >> Could you clarify what you mean by same Flink connector ? Do you
>>>>>> mean
>>>>>> that Beam-based and non-Beam-based versions of Flink will use the same
>>>>>> Kafka connector implementation ?
>>>>>>
>>>>>>
>>>>>> > The native Flink sources as shown in the example below, not the Beam
>>>>>> KafkaIO or other Beam sources.
>>>>>>
>>>>>>
>>>>>>
>>>>>> >>> I got a simple example to work with the FlinkKafkaConsumer010
>>>>>> reading
>>>>>> from Kafka and a Python lambda logging the value. The code is here:
>>>>>>
>>>>>>
>>>>>>
>>>>>> https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9
>>>>>>
>>>>>>
>>>>>>
>>>>>> >>> I'm looking for feedback/opinions on the following items in
>>>>>> particular:
>>>>>>
>>>>>> >>> * Enabling custom translation on the Flink portable runner (custom
>>>>>> translator could be loaded with ServiceLoader, additional translations
>>>>>> could also be specified as job server configuration, pipeline option,
>>>>>> ...)
>>>>>>
>>>>>> >>> * For the Python side, is what's shown in the commit the
>>>>>> recommended
>>>>>> way to define a custom transform (it would eventually live in a
>>>>>> reusable
>>>>>> custom module that pipeline authors can import)? Also, the example
>>>>>> does not
>>>>>> have the configuration part covered yet..
>>>>>>
>>>>>>
>>>>>> >> The only standard unbounded source API offered by Python SDK is the
>>>>>> Splittable DoFn API. This is the part I'm working on. I'm trying to
>>>>>> add a
>>>>>> Kafka connector for Beam Python SDK using SDF API. JIRA is
>>>>>> https://issues.apache.org/jira/browse/BEAM-3788. I'm currently
>>>>>> comparing
>>>>>> different Kafka Python client libraries. Will share more information
>>>>>> on
>>>>>> this soon.
>>>>>>
>>>>>> >> I understand this might not be possible in all cases and we might
>>>>>> want
>>>>>> to consider adding a native source/sink implementations. But this will
>>>>>> result in the implementation being runner-specific (each runner will
>>>>>> have
>>>>>> to have it's own source/sink implementation). So I think we should
>>>>>> try to
>>>>>> add connector implementations to Beam using the standard API whenever
>>>>>> possible. We also have plans to implement support for cross SDK
>>>>>> transforms
>>>>>> in the future (so that we can utilize Java implementation from Python
>>>>>> for
>>>>>> example) but we are not there yet and we might still want to
>>>>>> implement a
>>>>>> connector for a given SDK if there's good client library support.
>>>>>>
>>>>>>
>>>>>> > It is great that the Python SDK will have connectors that are
>>>>>> written in
>>>>>> Python in the future, but I think it is equally if not more important
>>>>>> to be
>>>>>> able to use at least the Java Beam connectors with Python SDK (and any
>>>>>> other non-Java SDK). Especially in a fully managed environment it
>>>>>> should be
>>>>>> possible to offer this to users in a way that is largely transparent.
>>>>>> It
>>>>>> takes significant time and effort to mature connectors and I'm not
>>>>>> sure it
>>>>>> is realistic to repeat that for all external systems in multiple
>>>>>> languages.
>>>>>> Or, to put it in another way, it is likely that instead of one over
>>>>>> time
>>>>>> rock solid connector per external system there will be multiple less
>>>>>> mature
>>>>>> implementations. That's also the reason we internally want to use the
>>>>>> Flink
>>>>>> native connectors - we know what they can and cannot do and want to
>>>>>> leverage the existing investment.
>>>>>>
>>>>>> There are two related issues here: how to specify transforms (such as
>>>>>> sources) in a language-independent manner, and how specific runners
>>>>>> can
>>>>>> recognize and run them, but URNs solve both. For  this we use URNs:
>>>>>> the
>>>>>> composite ReadFromKafka PTransform (that consists of a Impulse +
>>>>>> SDF(KafkaDoFn)) to encodes to a URN with an attached payload that
>>>>>> fully
>>>>>> specifies this read. (The KafkaDoFn could similarly have a URN and
>>>>>> payload.) A runner that understands these URNs is free to make any
>>>>>> (semantically-equivalent) substitutions it wants for this transform.
>>>>>>
>>>>>> Note that a KafkaDoFn still needs to be provided, but could be a DoFn
>>>>>> that
>>>>>> fails loudly if it's actually called in the short term rather than a
>>>>>> full
>>>>>> Python implementation. Eventually, we would like to be able to call
>>>>>> out to
>>>>>> another SDK to expand full transforms (e.g. more complicated ones like
>>>>>> BigQueryIO).
>>>>>>
>>>>>> >>> * Cross-language coders: In this example the Kafka source only
>>>>>> considers the message value and uses the byte coder that both sides
>>>>>> understand. If I wanted to pass on the key and possibly other
>>>>>> metadata to
>>>>>> the Python transform (similar to KafkaRecord from Java KafkaIO), then
>>>>>> a
>>>>>> specific coder is needed. Such coder could be written using protobuf,
>>>>>> Avro
>>>>>> etc, but it would also need to be registered.
>>>>>>
>>>>>>
>>>>>> >> I think this requirement goes away if we implement Kafka in Python
>>>>>> SDK.
>>>>>>
>>>>>> > Wouldn't this be needed for any cross language pipeline? Or rather
>>>>>> any
>>>>>> that isn't only using PCollection<byte[]>? Is there a language
>>>>>> agnostic
>>>>>> encoding for KV<?,?>, for example?
>>>>>>
>>>>>> Yes, Coders are also specified by URN (+components and/or payload),
>>>>>> and
>>>>>> there are a couple of standard ones, including KV. See
>>>>>>
>>>>>> https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
>>>>>> This is not a closed set.
>>>>>>
>>>>>> - Robert
>>>>>>
>>>>>

Reply via email to