Hi,

can you share your code ? I will fix that with you. I suggest to check the expand() method in the read PTransform and the way you use generic there.

Any plan to donate this IO: I would be happy to review the PR !

Do you leverage some InfluxDB feature (like splitting/sharding) ?

Regards
JB

On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:
Hi Beam Dev,

We have developed our own sdk io  functions for read/write InfluxDBIO
operations in apache BEAM.  it is works with default coder, which is
StringUtf8Coder.of().

  PCollection<String> output = pipeline.apply(
             InfluxDbIO.<String>read()
                     .withUri("http://localhost:8086";)
                     .withDatabase("beam"));




With reference mongoDB and JDBC, implemented the read function with
setcoder() options in InfluxDB also, but it is not working.

     PCollection<String> output = pipeline.apply(
             InfluxDbIO.<String>read()
                     .withParser(new InfluxDbIO.Parser<String>() {
                       @Override
                       public void parse(String input,
                                       InfluxDbIO.ParserCallback<String>
callback) throws IOException {
                         callback.output(input);
                       }
                     })
                     .withUri("http://localhost:8086";)
                     .withDatabase("beam")
                     .withCoder(StringUtf8Coder.of()));----> with coder
getting error as

java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
cast to org.apache.beam.sdk.values.PCollection

Thanks & Regards,
Ramanjaneya


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to