Hi! Thank you for raising these issues. Comments inline.
On Fri, Sep 16, 2016 at 7:19 AM Jean-Baptiste Onofré <[email protected]> wrote: > Hi all, > > as you may know I'm working on different new IOs for Beam. > > I have some suggestions that I would like to discuss with you all. > > 1/ Sink > The SDK provides a Sink abstract class. It represents a resource that > can be written using a Write transform: > > p.apply(Write.to(new MySink())); > > The Sink creates a Writer. A Writer is an abstract class where we > override the open(), write(), close() methods. > Today, no IOs use Sink: they directly use a DoFn. > I fully agree that it's very convenient to implement a Sink but it may > appear like non consistent and can "perturb" the users/developers. > > It comes me to the second point. > > 2/ Source > Today, a IO Read apply() method use a source via Read.from(source). > > However, if the source API is not required (for instance in the case of > JDBC where we can't really implement getEstimatedSizeBytes() and > splitIntoBundles()), it's possible to directly use a DoFn instead of a > Source. > > So again, it could appear like non consistent. > > Maybe it would make more sense to "force" the usage of Source even if we > don't leverage all Source features (for instance, in the case of JDBC > IO, getEstimatedSizeBytes() will return 0L and splitIntoBundles() will > return a list with a single source). > The same for Sink: even if a Sink can be implemented with DoFn, it would > be more consistent to implement it with Sink (or remove Sink ;)). > I understand what you're coming from: when one wants to implement a connector, it'd be great if there was an obvious starting point: e.g. "if I want to read data, I implement a Source; if I want to write data, I implement a Sink". As a reviewer of your IO PRs, I can also definitely see how frustrating it is to implement something, say, as a Source, and then reimplement it without the Source API having discovered that the API is overkill and the connector can be implemented much simpler - feels like wasted work due to being mislead by an API with a tempting name. However, the Source and Sink APIs are legitimately clumsy to use (this is why connectors become much simpler when you can avoid using them!), and I'd feel bad forcing them on users, as well as perpetuating their use while significantly better alternatives are being developed (Splittable DoFn; and see Dan Halperin's latest email about sinks). In this light, if I had to choose between removing Sink and forcing it on users, I'd choose to remove it. The real issue here is that they have very tempting names, with overloaded names, that mislead users (e.g. "source" means both "a connector that reads data" and "a subclass of Source"); and, perhaps, lack of an obviously accessible document providing guidance on how to develop a connector. Here's how I think about these APIs and about developing connectors. In the thoughts below, "source" will mean "a connector that reads data" and "sink" will mean "a connector that writes data". - A connector is nothing special - just like any other transform, it does stuff internally and produces and/or consumes PCollection's. You develop a connector like any other transform: by hacking it together from ParDo's and GroupByKey's and other simple primitives. - There may be a bunch of common patterns; e.g. many sinks will want to do buffering and write things in batches. When writing bounded data to files, you probably want to write to a temporary location and finally have a global rename step. Many sources will want to have a fusion break between splitting themselves in parts and reading from the parts. Etc. The SDK should provide simple, small, orthogonal utilities for these purposes, so that it's easier for you to hack your connector together. - Runners may implement advanced features, such as liquid sharding, progress or backlog signals, size estimation, etc., that require collaboration from your connector. This is heavy artillery and almost always you shouldn't even need to know it exists, unless you feel like for your connector implementing these features is 1) possible and 2) worth the trouble. In that case, you use a special advanced API. Right now, all of these advanced runner features happen to be specific to sources, and happen to be encapsulated in an advanced API with a deceptively accessible-sounding name, "Source". But that needn't be the case (and Splittable DoFn makes this *not* be the case - it moves these advanced features into an optional feature of DoFn's). Ideally, we'd have a public document - "guide on developing connectors". It would contain best practices, common patterns and pitfalls, and such - e.g. "batch your write RPCs", "don't forget a fusion break", "remember to close resources", "remember about retries", "here's how a concise builder pattern looks like", "if you're writing bounded data to files, consider using this utility transform", "here are some low-level utilities for globbing/opening/renaming/... files on GCS" etc. And yes, there would be no Sink API at all. And a few examples of connectors developed according to these. Somewhere toward the end, the document would make a note about advanced features that some sources can support and runners can hook into, and would say there's an API for that if you *really* know what you're doing. The Source class would be renamed to something more scary, like "Parallel{Bounded,Unbounded}Input". When Splittable DoFn is ready, it would be mentioned in the same section too. What do you think? > > 3/ Type Converter > Today each IO represent an element in the PCollection as he wants. > For instance, the following pipeline won't compile straight forward: > > p.apply(JmsIO.Read()...) // returns a PCollection<JmsRecord> > .apply(JdbcIO.Write()...) // expects PCollection<JdbcDataRecord> > > The user will have to "convert" PCollection<JmsRecord> as > PCollection<JdbcDataRecord>. > > Maybe it makes sense to provide a Converter in the IOs and use kind of > schema and canonical format (optionally), for instance based on Avro. > I think this point is not specific to IO. Sounds like you want general support for data objects that require less hassle to manipulate in a pipeline than plain Java objects. Or utilities that would make manipulating Java objects less of a hassle (e.g. something like: automatic DoFn's for projecting a subset of fields, or filtering on contents of a field, etc). This sounds interesting and I'm curious to hear your ideas on how this could look like! > I added this point in the "Technical Vision" while ago, but I think it > would simplify the way of writing pipelines. > > Thoughts ? > > Regards > JB > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
