Hi,
It looks good. I just saw some issues:
- javadoc is not correct in HiveIO (it says write() for read ;)).
- estimated size is global to the table (doesn't consider the filter). It's not
a big deal, but it should be documented.
- you don't use the desired bundle size provided by the runner for the split.
You are using the Hive split count, which is fine, just explain in the main
javadoc maybe.
- the reader should set current to null with nothing is read
- getCurrent() should throw NoSuchElementException in case of current is null
- in the writer, the flush should happen at the end of the batch as you did, but
also when the bundle is finished
Thanks !
Great work
Regards
JB
On 05/24/2017 01:36 AM, Seshadri Raghunathan wrote:
Hi,
You can find a draft implementation of the same here :
HiveIO Source -
https://github.com/seshadri-cr/beam/commit/b74523c13e03dc70038bc1e348ce270fbb3fd99b
HiveIO Sink -
https://github.com/seshadri-cr/beam/commit/0008f772a989c8cd817a99987a145fbf2f7fc795
Please let us know your comments and suggestions.
Regards,
Seshadri
408 601 7548
From: Madhusudan Borkar [mailto:[email protected]]
Sent: Tuesday, May 23, 2017 3:12 PM
To: [email protected]; Seshadri Raghunathan <[email protected]>; Rajesh
Pandey <[email protected]>
Subject: [New Proposal] Hive connector using native api
Hi,
HadoopIO can be used to read from Hive. It doesn't provide writing to Hive.
This new proposal for Hive connector includes both source and sink. It uses
Hive native api.
Apache HCatalog provides way to read / write to hive without using mapreduce.
HCatReader reads data from cluster, using basic storage abstraction of tables
and rows. HCatWriter writes to cluster and a batching process will be used to
write in bulk. Please refer to Apache documentation on HCatalog ReaderWriter
https://cwiki.apache.org/confluence/display/Hive/HCatalog+ReaderWriter
Solution:
It will work like:
pipeline.apply(HiveIO.read()
.withMetastoreUri("uri") //mandatory
.withTable("myTable") //mandatory
.withDatabase("myDb") //optional, assumes default if none specified
.withPartition(“partition”) //optional,should be specified if the table is
partitioned
pipeline.apply(HiveIO.write()
.withMetastoreUri("uri") //mandatory
.withTable("myTable") //mandatory
.withDatabase("myDb") //optional, assumes default if none specified
.withPartition(“partition”) //optional
.withBatchSize(size)) //optional
Please, let us know your comments and suggestions.
Madhu Borkar
--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com