On Mon, Sep 19, 2016 at 7:17 PM Jean-Baptiste Onofré <[email protected]> wrote:
> Hi Eugene, > > thanks for your feedbacks. > > See my comments: > > 1/ I now have a better understanding the logic/steps to write IO and as > a IO Developer, it's not really a problem. My concern is more when you > have to build an application on top of Beam (like for instance a DSL). > Then, we can identify a Read PTransform (as it starts with PBegin) and a > Write PTransform (as it ends with PDone). But, it's not so easy to > identify and create wrapper based on that. > I'm working on couple of Beam DSL right now, and it's the issue I have. > Hmm, I don't quite understand this paragraph. Why do we need to identify transforms as "read"/"write", and what wrappers are you talking about? Also please keep in mind the possibility of transforms that can reasonably called "read" but don't start with a PBegin - e.g. if you develop a transform that reads a PCollection of filenames. Or likewise, "write" but don't end with a PDone: say, a transform that writes data to a temporary directory and returns the name of the directory as a single-element PCollection. > > 2/ You are right, the proposal is more generic than just IO and having > kind of "canonical" standard Beam format will simplify a lot the > pipelines (including the transform steps). > We could image having a Avro IndexRow/IndexedRecord (schema) as standard > format in the PCollection. We can represent basically anything with it. > The IO will have to create PCollection element of this type. > I think I would prefer if the Beam programming model stayed lightweight and continued to do just one job - processing the data - while allowing data to be represented in whichever way is reasonable depending on the context. Trying to make Beam *also* be a universal data representation format appropriate for all cases seems unnecessary. E.g. BigtableIO.Read should return the bigtable type Row, BigtableIO.Write should take Mutation objects, and Mongodb.Read/Write should use org.bson.Document (btw, right now they use String - I know I approved that in review, but perhaps this should be changed). However, I acknowledge that there are patterns in data processing that could benefit from having a structured representation of data (e.g. better optimization opportunities for the runner), so perhaps Beam could *also* provide such a representation - e.g. see how Spark does data frames and maps them onto Java objects in a lightweight way http://spark.apache.org/docs/latest/sql-programming-guide.html#inferring-the-schema-using-reflection . But this should be probably provided as a collection of utilities, rather than as a core feature of the programming model. > > Regards > JB > > On 09/20/2016 03:59 AM, Eugene Kirpichov wrote: > > Hi! > > > > Thank you for raising these issues. Comments inline. > > > > On Fri, Sep 16, 2016 at 7:19 AM Jean-Baptiste Onofré <[email protected]> > > wrote: > > > >> Hi all, > >> > >> as you may know I'm working on different new IOs for Beam. > >> > >> I have some suggestions that I would like to discuss with you all. > >> > >> 1/ Sink > >> The SDK provides a Sink abstract class. It represents a resource that > >> can be written using a Write transform: > >> > >> p.apply(Write.to(new MySink())); > >> > >> The Sink creates a Writer. A Writer is an abstract class where we > >> override the open(), write(), close() methods. > >> Today, no IOs use Sink: they directly use a DoFn. > >> I fully agree that it's very convenient to implement a Sink but it may > >> appear like non consistent and can "perturb" the users/developers. > >> > >> It comes me to the second point. > >> > >> 2/ Source > >> Today, a IO Read apply() method use a source via Read.from(source). > >> > >> However, if the source API is not required (for instance in the case of > >> JDBC where we can't really implement getEstimatedSizeBytes() and > >> splitIntoBundles()), it's possible to directly use a DoFn instead of a > >> Source. > >> > >> So again, it could appear like non consistent. > >> > >> Maybe it would make more sense to "force" the usage of Source even if we > >> don't leverage all Source features (for instance, in the case of JDBC > >> IO, getEstimatedSizeBytes() will return 0L and splitIntoBundles() will > >> return a list with a single source). > >> The same for Sink: even if a Sink can be implemented with DoFn, it would > >> be more consistent to implement it with Sink (or remove Sink ;)). > >> > > > > I understand what you're coming from: when one wants to implement a > > connector, it'd be great if there was an obvious starting point: e.g. > "if I > > want to read data, I implement a Source; if I want to write data, I > > implement a Sink". As a reviewer of your IO PRs, I can also definitely > see > > how frustrating it is to implement something, say, as a Source, and then > > reimplement it without the Source API having discovered that the API is > > overkill and the connector can be implemented much simpler - feels like > > wasted work due to being mislead by an API with a tempting name. > > > > However, the Source and Sink APIs are legitimately clumsy to use (this is > > why connectors become much simpler when you can avoid using them!), and > I'd > > feel bad forcing them on users, as well as perpetuating their use while > > significantly better alternatives are being developed (Splittable DoFn; > and > > see Dan Halperin's latest email about sinks). In this light, if I had to > > choose between removing Sink and forcing it on users, I'd choose to > remove > > it. > > > > The real issue here is that they have very tempting names, with > overloaded > > names, that mislead users (e.g. "source" means both "a connector that > reads > > data" and "a subclass of Source"); and, perhaps, lack of an obviously > > accessible document providing guidance on how to develop a connector. > > > > Here's how I think about these APIs and about developing connectors. > > In the thoughts below, "source" will mean "a connector that reads data" > and > > "sink" will mean "a connector that writes data". > > - A connector is nothing special - just like any other transform, it does > > stuff internally and produces and/or consumes PCollection's. You develop > a > > connector like any other transform: by hacking it together from ParDo's > and > > GroupByKey's and other simple primitives. > > - There may be a bunch of common patterns; e.g. many sinks will want to > do > > buffering and write things in batches. When writing bounded data to > files, > > you probably want to write to a temporary location and finally have a > > global rename step. Many sources will want to have a fusion break between > > splitting themselves in parts and reading from the parts. Etc. The SDK > > should provide simple, small, orthogonal utilities for these purposes, so > > that it's easier for you to hack your connector together. > > - Runners may implement advanced features, such as liquid sharding, > > progress or backlog signals, size estimation, etc., that require > > collaboration from your connector. This is heavy artillery and almost > > always you shouldn't even need to know it exists, unless you feel like > for > > your connector implementing these features is 1) possible and 2) worth > the > > trouble. In that case, you use a special advanced API. Right now, all of > > these advanced runner features happen to be specific to sources, and > happen > > to be encapsulated in an advanced API with a deceptively > > accessible-sounding name, "Source". But that needn't be the case (and > > Splittable DoFn makes this *not* be the case - it moves these advanced > > features into an optional feature of DoFn's). > > > > Ideally, we'd have a public document - "guide on developing connectors". > It > > would contain best practices, common patterns and pitfalls, and such - > e.g. > > "batch your write RPCs", "don't forget a fusion break", "remember to > close > > resources", "remember about retries", "here's how a concise builder > pattern > > looks like", "if you're writing bounded data to files, consider using > this > > utility transform", "here are some low-level utilities for > > globbing/opening/renaming/... files on GCS" etc. And yes, there would be > no > > Sink API at all. And a few examples of connectors developed according to > > these. > > Somewhere toward the end, the document would make a note about advanced > > features that some sources can support and runners can hook into, and > would > > say there's an API for that if you *really* know what you're doing. The > > Source class would be renamed to something more scary, like > > "Parallel{Bounded,Unbounded}Input". When Splittable DoFn is ready, it > would > > be mentioned in the same section too. > > > > What do you think? > > > > > >> > >> 3/ Type Converter > >> Today each IO represent an element in the PCollection as he wants. > >> For instance, the following pipeline won't compile straight forward: > >> > >> p.apply(JmsIO.Read()...) // returns a PCollection<JmsRecord> > >> .apply(JdbcIO.Write()...) // expects PCollection<JdbcDataRecord> > >> > >> The user will have to "convert" PCollection<JmsRecord> as > >> PCollection<JdbcDataRecord>. > >> > >> Maybe it makes sense to provide a Converter in the IOs and use kind of > >> schema and canonical format (optionally), for instance based on Avro. > >> > > I think this point is not specific to IO. Sounds like you want general > > support for data objects that require less hassle to manipulate in a > > pipeline than plain Java objects. Or utilities that would make > manipulating > > Java objects less of a hassle (e.g. something like: automatic DoFn's for > > projecting a subset of fields, or filtering on contents of a field, etc). > > This sounds interesting and I'm curious to hear your ideas on how this > > could look like! > > > > > >> I added this point in the "Technical Vision" while ago, but I think it > >> would simplify the way of writing pipelines. > >> > >> Thoughts ? > >> > >> Regards > >> JB > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >> > > > > -- > Jean-Baptiste Onofré > [email protected] > http://blog.nanthrax.net > Talend - http://www.talend.com >
