On Tue, Apr 24, 2018 at 1:14 PM Thomas Weise <t...@apache.org> wrote:

> Hi Cham,

> Thanks for the feedback!

> I should have probably clarified that my POC and questions aren't
specific to Kafka as source, but pretty much any other source/sink that we
internally use as well. We have existing Flink pipelines that are written
in Java and we want to use the same connectors with the Python SDK on top
of the already operationalized Flink stack. Therefore, portability isn't a
concern as much as the ability to integrate is.

> -->

> On Tue, Apr 24, 2018 at 12:00 PM, Chamikara Jayalath
> <chamik...@google.com>
wrote:

>> Hi Thomas,

>> Seems like we are working on similar (partially) things :).

>> On Tue, Apr 24, 2018 at 9:03 AM Thomas Weise <t...@apache.org> wrote:

>>> I'm working on a mini POC to enable Kafka as custom streaming source
for a Python pipeline executing on the (in-progress) portable Flink runner.

>>> We eventually want to use the same native Flink connectors for sources
and sinks that we also use in other Flink jobs.


>> Could you clarify what you mean by same Flink connector ? Do you mean
that Beam-based and non-Beam-based versions of Flink will use the same
Kafka connector implementation ?


> The native Flink sources as shown in the example below, not the Beam
KafkaIO or other Beam sources.



>>> I got a simple example to work with the FlinkKafkaConsumer010 reading
from Kafka and a Python lambda logging the value. The code is here:


https://github.com/tweise/beam/commit/79b682eb4b83f5b9e80f295464ebf3499edb1df9



>>> I'm looking for feedback/opinions on the following items in particular:

>>> * Enabling custom translation on the Flink portable runner (custom
translator could be loaded with ServiceLoader, additional translations
could also be specified as job server configuration, pipeline option, ...)

>>> * For the Python side, is what's shown in the commit the recommended
way to define a custom transform (it would eventually live in a reusable
custom module that pipeline authors can import)? Also, the example does not
have the configuration part covered yet..


>> The only standard unbounded source API offered by Python SDK is the
Splittable DoFn API. This is the part I'm working on. I'm trying to add a
Kafka connector for Beam Python SDK using SDF API. JIRA is
https://issues.apache.org/jira/browse/BEAM-3788. I'm currently comparing
different Kafka Python client libraries. Will share more information on
this soon.

>> I understand this might not be possible in all cases and we might want
to consider adding a native source/sink implementations. But this will
result in the implementation being runner-specific (each runner will have
to have it's own source/sink implementation). So I think we should try to
add connector implementations to Beam using the standard API whenever
possible. We also have plans to implement support for cross SDK transforms
in the future (so that we can utilize Java implementation from Python for
example) but we are not there yet and we might still want to implement a
connector for a given SDK if there's good client library support.


> It is great that the Python SDK will have connectors that are written in
Python in the future, but I think it is equally if not more important to be
able to use at least the Java Beam connectors with Python SDK (and any
other non-Java SDK). Especially in a fully managed environment it should be
possible to offer this to users in a way that is largely transparent. It
takes significant time and effort to mature connectors and I'm not sure it
is realistic to repeat that for all external systems in multiple languages.
Or, to put it in another way, it is likely that instead of one over time
rock solid connector per external system there will be multiple less mature
implementations. That's also the reason we internally want to use the Flink
native connectors - we know what they can and cannot do and want to
leverage the existing investment.

There are two related issues here: how to specify transforms (such as
sources) in a language-independent manner, and how specific runners can
recognize and run them, but URNs solve both. For  this we use URNs: the
composite ReadFromKafka PTransform (that consists of a Impulse +
SDF(KafkaDoFn)) to encodes to a URN with an attached payload that fully
specifies this read. (The KafkaDoFn could similarly have a URN and
payload.) A runner that understands these URNs is free to make any
(semantically-equivalent) substitutions it wants for this transform.

Note that a KafkaDoFn still needs to be provided, but could be a DoFn that
fails loudly if it's actually called in the short term rather than a full
Python implementation. Eventually, we would like to be able to call out to
another SDK to expand full transforms (e.g. more complicated ones like
BigQueryIO).

>>> * Cross-language coders: In this example the Kafka source only
considers the message value and uses the byte coder that both sides
understand. If I wanted to pass on the key and possibly other metadata to
the Python transform (similar to KafkaRecord from Java KafkaIO), then a
specific coder is needed. Such coder could be written using protobuf, Avro
etc, but it would also need to be registered.


>> I think this requirement goes away if we implement Kafka in Python SDK.

> Wouldn't this be needed for any cross language pipeline? Or rather any
that isn't only using PCollection<byte[]>? Is there a language agnostic
encoding for KV<?,?>, for example?

Yes, Coders are also specified by URN (+components and/or payload), and
there are a couple of standard ones, including KV. See
https://github.com/apache/beam/blob/master/model/pipeline/src/main/resources/org/apache/beam/model/common_urns.md
This is not a closed set.

- Robert

Reply via email to