Hi Eugene,

thanks for your feedbacks.

See my comments:

1/ I now have a better understanding the logic/steps to write IO and as a IO Developer, it's not really a problem. My concern is more when you have to build an application on top of Beam (like for instance a DSL). Then, we can identify a Read PTransform (as it starts with PBegin) and a Write PTransform (as it ends with PDone). But, it's not so easy to identify and create wrapper based on that.
I'm working on couple of Beam DSL right now, and it's the issue I have.

2/ You are right, the proposal is more generic than just IO and having kind of "canonical" standard Beam format will simplify a lot the pipelines (including the transform steps). We could image having a Avro IndexRow/IndexedRecord (schema) as standard format in the PCollection. We can represent basically anything with it. The IO will have to create PCollection element of this type.


On 09/20/2016 03:59 AM, Eugene Kirpichov wrote:

Thank you for raising these issues. Comments inline.

On Fri, Sep 16, 2016 at 7:19 AM Jean-Baptiste Onofré <j...@nanthrax.net>

Hi all,

as you may know I'm working on different new IOs for Beam.

I have some suggestions that I would like to discuss with you all.

1/ Sink
The SDK provides a Sink abstract class. It represents a resource that
can be written using a Write transform:

   p.apply(Write.to(new MySink()));

The Sink creates a Writer. A Writer is an abstract class where we
override the open(), write(), close() methods.
Today, no IOs use Sink: they directly use a DoFn.
I fully agree that it's very convenient to implement a Sink but it may
appear like non consistent and can "perturb" the users/developers.

It comes me to the second point.

2/ Source
Today, a IO Read apply() method use a source via Read.from(source).

However, if the source API is not required (for instance in the case of
JDBC where we can't really implement getEstimatedSizeBytes() and
splitIntoBundles()), it's possible to directly use a DoFn instead of a

So again, it could appear like non consistent.

Maybe it would make more sense to "force" the usage of Source even if we
don't leverage all Source features (for instance, in the case of JDBC
IO, getEstimatedSizeBytes() will return 0L and splitIntoBundles() will
return a list with a single source).
The same for Sink: even if a Sink can be implemented with DoFn, it would
be more consistent to implement it with Sink (or remove Sink ;)).

I understand what you're coming from: when one wants to implement a
connector, it'd be great if there was an obvious starting point: e.g. "if I
want to read data, I implement a Source; if I want to write data, I
implement a Sink". As a reviewer of your IO PRs, I can also definitely see
how frustrating it is to implement something, say, as a Source, and then
reimplement it without the Source API having discovered that the API is
overkill and the connector can be implemented much simpler - feels like
wasted work due to being mislead by an API with a tempting name.

However, the Source and Sink APIs are legitimately clumsy to use (this is
why connectors become much simpler when you can avoid using them!), and I'd
feel bad forcing them on users, as well as perpetuating their use while
significantly better alternatives are being developed (Splittable DoFn; and
see Dan Halperin's latest email about sinks). In this light, if I had to
choose between removing Sink and forcing it on users, I'd choose to remove

The real issue here is that they have very tempting names, with overloaded
names, that mislead users (e.g. "source" means both "a connector that reads
data" and "a subclass of Source"); and, perhaps, lack of an obviously
accessible document providing guidance on how to develop a connector.

Here's how I think about these APIs and about developing connectors.
In the thoughts below, "source" will mean "a connector that reads data" and
"sink" will mean "a connector that writes data".
- A connector is nothing special - just like any other transform, it does
stuff internally and produces and/or consumes PCollection's. You develop a
connector like any other transform: by hacking it together from ParDo's and
GroupByKey's and other simple primitives.
- There may be a bunch of common patterns; e.g. many sinks will want to do
buffering and write things in batches. When writing bounded data to files,
you probably want to write to a temporary location and finally have a
global rename step. Many sources will want to have a fusion break between
splitting themselves in parts and reading from the parts. Etc. The SDK
should provide simple, small, orthogonal utilities for these purposes, so
that it's easier for you to hack your connector together.
- Runners may implement advanced features, such as liquid sharding,
progress or backlog signals, size estimation, etc., that require
collaboration from your connector. This is heavy artillery and almost
always you shouldn't even need to know it exists, unless you feel like for
your connector implementing these features is 1) possible and 2) worth the
trouble. In that case, you use a special advanced API. Right now, all of
these advanced runner features happen to be specific to sources, and happen
to be encapsulated in an advanced API with a deceptively
accessible-sounding name, "Source". But that needn't be the case (and
Splittable DoFn makes this *not* be the case - it moves these advanced
features into an optional feature of DoFn's).

Ideally, we'd have a public document - "guide on developing connectors". It
would contain best practices, common patterns and pitfalls, and such - e.g.
"batch your write RPCs", "don't forget a fusion break", "remember to close
resources", "remember about retries", "here's how a concise builder pattern
looks like", "if you're writing bounded data to files, consider using this
utility transform", "here are some low-level utilities for
globbing/opening/renaming/... files on GCS" etc. And yes, there would be no
Sink API at all. And a few examples of connectors developed according to
Somewhere toward the end, the document would make a note about advanced
features that some sources can support and runners can hook into, and would
say there's an API for that if you *really* know what you're doing. The
Source class would be renamed to something more scary, like
"Parallel{Bounded,Unbounded}Input". When Splittable DoFn is ready, it would
be mentioned in the same section too.

What do you think?

3/ Type Converter
Today each IO represent an element in the PCollection as he wants.
For instance, the following pipeline won't compile straight forward:

p.apply(JmsIO.Read()...) // returns a PCollection<JmsRecord>
  .apply(JdbcIO.Write()...) // expects PCollection<JdbcDataRecord>

The user will have to "convert" PCollection<JmsRecord> as

Maybe it makes sense to provide a Converter in the IOs and use kind of
schema and canonical format (optionally), for instance based on Avro.

I think this point is not specific to IO. Sounds like you want general
support for data objects that require less hassle to manipulate in a
pipeline than plain Java objects. Or utilities that would make manipulating
Java objects less of a hassle (e.g. something like: automatic DoFn's for
projecting a subset of fields, or filtering on contents of a field, etc).
This sounds interesting and I'm curious to hear your ideas on how this
could look like!

I added this point in the "Technical Vision" while ago, but I think it
would simplify the way of writing pipelines.

Thoughts ?

Jean-Baptiste Onofré
Talend - http://www.talend.com

Jean-Baptiste Onofré
Talend - http://www.talend.com

Reply via email to