Thank you for raising these issues. Comments inline.

On Fri, Sep 16, 2016 at 7:19 AM Jean-Baptiste Onofré <j...@nanthrax.net>

> Hi all,
> as you may know I'm working on different new IOs for Beam.
> I have some suggestions that I would like to discuss with you all.
> 1/ Sink
> The SDK provides a Sink abstract class. It represents a resource that
> can be written using a Write transform:
>    p.apply(Write.to(new MySink()));
> The Sink creates a Writer. A Writer is an abstract class where we
> override the open(), write(), close() methods.
> Today, no IOs use Sink: they directly use a DoFn.
> I fully agree that it's very convenient to implement a Sink but it may
> appear like non consistent and can "perturb" the users/developers.
> It comes me to the second point.
> 2/ Source
> Today, a IO Read apply() method use a source via Read.from(source).
> However, if the source API is not required (for instance in the case of
> JDBC where we can't really implement getEstimatedSizeBytes() and
> splitIntoBundles()), it's possible to directly use a DoFn instead of a
> Source.
> So again, it could appear like non consistent.
> Maybe it would make more sense to "force" the usage of Source even if we
> don't leverage all Source features (for instance, in the case of JDBC
> IO, getEstimatedSizeBytes() will return 0L and splitIntoBundles() will
> return a list with a single source).
> The same for Sink: even if a Sink can be implemented with DoFn, it would
> be more consistent to implement it with Sink (or remove Sink ;)).

I understand what you're coming from: when one wants to implement a
connector, it'd be great if there was an obvious starting point: e.g. "if I
want to read data, I implement a Source; if I want to write data, I
implement a Sink". As a reviewer of your IO PRs, I can also definitely see
how frustrating it is to implement something, say, as a Source, and then
reimplement it without the Source API having discovered that the API is
overkill and the connector can be implemented much simpler - feels like
wasted work due to being mislead by an API with a tempting name.

However, the Source and Sink APIs are legitimately clumsy to use (this is
why connectors become much simpler when you can avoid using them!), and I'd
feel bad forcing them on users, as well as perpetuating their use while
significantly better alternatives are being developed (Splittable DoFn; and
see Dan Halperin's latest email about sinks). In this light, if I had to
choose between removing Sink and forcing it on users, I'd choose to remove

The real issue here is that they have very tempting names, with overloaded
names, that mislead users (e.g. "source" means both "a connector that reads
data" and "a subclass of Source"); and, perhaps, lack of an obviously
accessible document providing guidance on how to develop a connector.

Here's how I think about these APIs and about developing connectors.
In the thoughts below, "source" will mean "a connector that reads data" and
"sink" will mean "a connector that writes data".
- A connector is nothing special - just like any other transform, it does
stuff internally and produces and/or consumes PCollection's. You develop a
connector like any other transform: by hacking it together from ParDo's and
GroupByKey's and other simple primitives.
- There may be a bunch of common patterns; e.g. many sinks will want to do
buffering and write things in batches. When writing bounded data to files,
you probably want to write to a temporary location and finally have a
global rename step. Many sources will want to have a fusion break between
splitting themselves in parts and reading from the parts. Etc. The SDK
should provide simple, small, orthogonal utilities for these purposes, so
that it's easier for you to hack your connector together.
- Runners may implement advanced features, such as liquid sharding,
progress or backlog signals, size estimation, etc., that require
collaboration from your connector. This is heavy artillery and almost
always you shouldn't even need to know it exists, unless you feel like for
your connector implementing these features is 1) possible and 2) worth the
trouble. In that case, you use a special advanced API. Right now, all of
these advanced runner features happen to be specific to sources, and happen
to be encapsulated in an advanced API with a deceptively
accessible-sounding name, "Source". But that needn't be the case (and
Splittable DoFn makes this *not* be the case - it moves these advanced
features into an optional feature of DoFn's).

Ideally, we'd have a public document - "guide on developing connectors". It
would contain best practices, common patterns and pitfalls, and such - e.g.
"batch your write RPCs", "don't forget a fusion break", "remember to close
resources", "remember about retries", "here's how a concise builder pattern
looks like", "if you're writing bounded data to files, consider using this
utility transform", "here are some low-level utilities for
globbing/opening/renaming/... files on GCS" etc. And yes, there would be no
Sink API at all. And a few examples of connectors developed according to
Somewhere toward the end, the document would make a note about advanced
features that some sources can support and runners can hook into, and would
say there's an API for that if you *really* know what you're doing. The
Source class would be renamed to something more scary, like
"Parallel{Bounded,Unbounded}Input". When Splittable DoFn is ready, it would
be mentioned in the same section too.

What do you think?

> 3/ Type Converter
> Today each IO represent an element in the PCollection as he wants.
> For instance, the following pipeline won't compile straight forward:
> p.apply(JmsIO.Read()...) // returns a PCollection<JmsRecord>
>   .apply(JdbcIO.Write()...) // expects PCollection<JdbcDataRecord>
> The user will have to "convert" PCollection<JmsRecord> as
> PCollection<JdbcDataRecord>.
> Maybe it makes sense to provide a Converter in the IOs and use kind of
> schema and canonical format (optionally), for instance based on Avro.
I think this point is not specific to IO. Sounds like you want general
support for data objects that require less hassle to manipulate in a
pipeline than plain Java objects. Or utilities that would make manipulating
Java objects less of a hassle (e.g. something like: automatic DoFn's for
projecting a subset of fields, or filtering on contents of a field, etc).
This sounds interesting and I'm curious to hear your ideas on how this
could look like!

> I added this point in the "Technical Vision" while ago, but I think it
> would simplify the way of writing pipelines.
> Thoughts ?
> Regards
> JB
> --
> Jean-Baptiste Onofré
> jbono...@apache.org
> http://blog.nanthrax.net
> Talend - http://www.talend.com

Reply via email to