Hi,

It looks good. I just saw some issues:

- javadoc is not correct in HiveIO (it says write() for read ;)).
- estimated size is global to the table (doesn't consider the filter). It's not a big deal, but it should be documented. - you don't use the desired bundle size provided by the runner for the split. You are using the Hive split count, which is fine, just explain in the main javadoc maybe.
- the reader should set current to null with nothing is read
- getCurrent() should throw NoSuchElementException in case of current is null
- in the writer, the flush should happen at the end of the batch as you did, but also when the bundle is finished

Thanks !
Great work

Regards
JB

On 05/24/2017 01:36 AM, Seshadri Raghunathan wrote:
Hi,

You can find a draft implementation of the same here :

HiveIO Source - 
https://github.com/seshadri-cr/beam/commit/b74523c13e03dc70038bc1e348ce270fbb3fd99b

HiveIO Sink - 
https://github.com/seshadri-cr/beam/commit/0008f772a989c8cd817a99987a145fbf2f7fc795

Please let us know your comments and suggestions.

Regards,

Seshadri

408 601 7548

From: Madhusudan Borkar [mailto:[email protected]]
Sent: Tuesday, May 23, 2017 3:12 PM
To: [email protected]; Seshadri Raghunathan <[email protected]>; Rajesh 
Pandey <[email protected]>
Subject: [New Proposal] Hive connector using native api

Hi,

HadoopIO can be used to read from Hive. It doesn't provide writing to Hive. 
This new proposal for Hive connector includes both source and sink. It uses 
Hive native api.

Apache HCatalog provides way to read / write to hive without using mapreduce. 
HCatReader reads data from cluster, using basic storage abstraction of tables 
and rows. HCatWriter writes to cluster and a batching process will be used to 
write in bulk. Please refer to Apache documentation on HCatalog ReaderWriter 
https://cwiki.apache.org/confluence/display/Hive/HCatalog+ReaderWriter

Solution:

It will work like:

pipeline.apply(HiveIO.read()

.withMetastoreUri("uri") //mandatory

.withTable("myTable") //mandatory

.withDatabase("myDb") //optional, assumes default if none specified

.withPartition(“partition”) //optional,should be specified if the table is 
partitioned

pipeline.apply(HiveIO.write()

.withMetastoreUri("uri") //mandatory

.withTable("myTable") //mandatory

.withDatabase("myDb") //optional, assumes default if none specified

.withPartition(“partition”) //optional

.withBatchSize(size)) //optional

Please, let us know your comments and suggestions.




Madhu Borkar



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to