one comment I had that I realized was worth bringing back to the mailing list:
The Write transform here does batching using startBundle/finishBundle, but I suspect it'd be better to use the GroupIntoBatches transform before doing the actual write. I *think* our general guidance in the future should be that write transforms that do batching should use GroupIntoBatches instead of startBundle/finishBundle - or am I missing something? S On Wed, May 24, 2017 at 9:49 AM Seshadri Raghunathan < [email protected]> wrote: > Thanks all for your review and comments. I will raise a PR against > https://issues.apache.org/jira/browse/BEAM-2357 > > > Regards, > Seshadri > 408 601 7548 <(408)%20601-7548> > > -----Original Message----- > From: Ismaël Mejía [mailto:[email protected]] > Sent: Wednesday, May 24, 2017 2:57 AM > To: [email protected] > Subject: Re: [New Proposal] Hive connector using native api > > One quick thing I forgot to mention is that maybe it is a good idea for > the guys working on the Beam SQL implementation to take a look at their > needs for this IO considering that it could be quite useful to test the SQL > (given the structured nature of HCatalog). > > > On Wed, May 24, 2017 at 11:54 AM, Ismaël Mejía <[email protected]> wrote: > > Hello, > > > > I created a new JIRA for this native implementation of the IO so feel > > free to PR the 'native' implementation using this ticket. > > https://issues.apache.org/jira/browse/BEAM-2357 > > > > We will discuss all the small details in the PR. > > > > The old JIRA (BEAM-1158) will still be there just to add the read > > example for HCatalog using HIFIO. > > > > Regards, > > Ismaël > > > > > > On Wed, May 24, 2017 at 8:03 AM, Jean-Baptiste Onofré <[email protected]> > wrote: > >> Hi, > >> > >> It looks good. I just saw some issues: > >> > >> - javadoc is not correct in HiveIO (it says write() for read ;)). > >> - estimated size is global to the table (doesn't consider the > >> filter). It's not a big deal, but it should be documented. > >> - you don't use the desired bundle size provided by the runner for > >> the split. You are using the Hive split count, which is fine, just > >> explain in the main javadoc maybe. > >> - the reader should set current to null with nothing is read > >> - getCurrent() should throw NoSuchElementException in case of current > >> is null > >> - in the writer, the flush should happen at the end of the batch as > >> you did, but also when the bundle is finished > >> > >> Thanks ! > >> Great work > >> > >> Regards > >> JB > >> > >> > >> On 05/24/2017 01:36 AM, Seshadri Raghunathan wrote: > >>> > >>> Hi, > >>> > >>> > >>> You can find a draft implementation of the same here : > >>> > >>> > >>> HiveIO Source - > >>> https://github.com/seshadri-cr/beam/commit/b74523c13e03dc70038bc1e34 > >>> 8ce270fbb3fd99b > >>> > >>> HiveIO Sink - > >>> https://github.com/seshadri-cr/beam/commit/0008f772a989c8cd817a99987 > >>> a145fbf2f7fc795 > >>> > >>> > >>> Please let us know your comments and suggestions. > >>> > >>> > >>> Regards, > >>> > >>> Seshadri > >>> > >>> 408 601 7548 <(408)%20601-7548> > >>> > >>> > >>> From: Madhusudan Borkar [mailto:[email protected]] > >>> Sent: Tuesday, May 23, 2017 3:12 PM > >>> To: [email protected]; Seshadri Raghunathan > >>> <[email protected]>; Rajesh Pandey <[email protected]> > >>> Subject: [New Proposal] Hive connector using native api > >>> > >>> > >>> Hi, > >>> > >>> HadoopIO can be used to read from Hive. It doesn't provide writing > >>> to Hive. This new proposal for Hive connector includes both source > >>> and sink. It uses Hive native api. > >>> > >>> Apache HCatalog provides way to read / write to hive without using > >>> mapreduce. HCatReader reads data from cluster, using basic storage > >>> abstraction of tables and rows. HCatWriter writes to cluster and a > >>> batching process will be used to write in bulk. Please refer to > >>> Apache documentation on HCatalog ReaderWriter > >>> https://cwiki.apache.org/confluence/display/Hive/HCatalog+ReaderWrit > >>> er > >>> > >>> > >>> Solution: > >>> > >>> It will work like: > >>> > >>> > >>> pipeline.apply(HiveIO.read() > >>> > >>> .withMetastoreUri("uri") //mandatory > >>> > >>> .withTable("myTable") //mandatory > >>> > >>> .withDatabase("myDb") //optional, assumes default if none specified > >>> > >>> .withPartition(“partition”) //optional,should be specified if the > >>> table is partitioned > >>> > >>> > >>> pipeline.apply(HiveIO.write() > >>> > >>> .withMetastoreUri("uri") //mandatory > >>> > >>> .withTable("myTable") //mandatory > >>> > >>> .withDatabase("myDb") //optional, assumes default if none specified > >>> > >>> .withPartition(“partition”) //optional > >>> > >>> .withBatchSize(size)) //optional > >>> > >>> > >>> Please, let us know your comments and suggestions. > >>> > >>> > >>> > >>> > >>> Madhu Borkar > >>> > >>> > >> > >> -- > >> Jean-Baptiste Onofré > >> [email protected] > >> http://blog.nanthrax.net > >> Talend - http://www.talend.com > >
