I'm avialable @ Hangout
On Fri, Jun 30, 2017 at 11:25 AM, Jean-Baptiste Onofré <[email protected]>
wrote:
> Thanks, let me paste in an editor and do a review.
>
> I keep you posted. By the way, are you on Slack or Hangout to chat quickly
> ?
>
> Regards
> JB
>
>
> On 06/30/2017 07:46 AM, P. Ramanjaneya Reddy wrote:
>
>> Hi JB,
>>
>> Here is complete code for InfluxDB read/write methods, I have followed
>> generic method as other IO's.
>>
>> package org.apache.beam.sdk.io.influxdb;
>>
>> import static com.google.common.base.Preconditions.checkArgument;
>> import static com.google.common.base.Preconditions.checkNotNull;
>> import static com.google.common.base.Preconditions.checkState;
>>
>> import com.google.auto.value.AutoValue;
>> import org.apache.beam.sdk.coders.Coder;
>> import org.apache.beam.sdk.coders.SerializableCoder;
>> import org.apache.beam.sdk.io.BoundedSource;
>> import org.apache.beam.sdk.options.PipelineOptions;
>> import org.apache.beam.sdk.transforms.DoFn;
>> import org.apache.beam.sdk.transforms.PTransform;
>> import org.apache.beam.sdk.transforms.ParDo;
>> import org.apache.beam.sdk.transforms.display.DisplayData;
>> import org.apache.beam.sdk.values.PBegin;
>> import org.apache.beam.sdk.values.PCollection;
>>
>> import org.apache.beam.sdk.values.PDone;
>> import org.influxdb.InfluxDB;
>> import org.influxdb.InfluxDBFactory;
>> import org.influxdb.dto.BatchPoints;
>> import org.influxdb.dto.Point;
>> import org.influxdb.dto.Query;
>> import org.influxdb.dto.QueryResult;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.IOException;
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.Iterator;
>> import java.util.List;
>> import java.util.NoSuchElementException;
>> import java.util.concurrent.TimeUnit;
>> import javax.annotation.Nullable;
>>
>> /**
>> * Created by root1 on 16/5/17.
>> */
>> public class InfluxDbIO {
>> private static final Logger LOG = LoggerFactory.getLogger(Influx
>> DbIO.class);
>>
>> private InfluxDbIO() {
>> }
>>
>> /**
>> * Callback for the parser to use to submit data.
>> */
>> public interface ParserCallback<T> extends Serializable {
>> /**
>> * Output the object. The default timestamp will be the
>> GridFSDBFile
>> * creation timestamp.
>> * @param output
>> */
>> void output(T output);
>>
>> }
>>
>> /**
>> * Interface for the parser that is used to parse the GridFSDBFile
>> into
>> * the appropriate types.
>> * @param <T>
>> */
>> public interface Parser<T> extends Serializable {
>> void parse(String input, ParserCallback<T> callback) throws
>> IOException;
>> }
>>
>> /**
>> * For the default {@code Read<String>} case, this is the parser
>> that is used to
>> * split the input file into Strings. It uses the timestamp of the
>> file
>> * for the event timestamp.
>> */
>> private static final Parser<String> TEXT_PARSER = new
>> Parser<String>() {
>> @Override
>> public void parse(String input, ParserCallback<String> callback)
>> throws IOException {
>> callback.output(input);
>> }
>> };
>>
>> /**
>> * Read data from a InfluxDb.
>> *
>> * @param <T> Type of the data to be read.
>> */
>> public static <T> Read<T> read() {
>> return new AutoValue_InfluxDbIO_Read.Builder<T>().build();
>> }
>>
>> /**
>> * A {@link PTransform} to read data from InfluxDB.
>> */
>> @AutoValue
>> public abstract static class Read<T> extends PTransform<PBegin,
>> PCollection<T>> {
>> @Nullable abstract Parser<T> getparser();
>> @Nullable abstract String geturi();
>> @Nullable abstract String getdatabase();
>> @Nullable abstract Coder<T> getCoder();
>>
>> abstract Builder<T> toBuilder();
>>
>> @AutoValue.Builder
>> abstract static class Builder<T> {
>> abstract Builder<T> setParser(Parser<T> parser);
>> abstract Builder<T> setUri(String uri);
>> abstract Builder<T> setDatabase(String database);
>> abstract Builder<T> setCoder(Coder<T> coder);
>> abstract Read<T> build();
>> }
>>
>> public <X> Read<X> withParser(Parser<X> parser) {
>> checkNotNull(parser);
>> Builder<X> builder = (Builder<X>) toBuilder();
>> return builder.setParser(parser).setCoder(null).build();
>> }
>>
>> /**
>> * Example documentation for withUri.
>> */
>> public Read<T> withUri(String uri) {
>> checkNotNull(uri);
>> return toBuilder().setUri(uri).build();
>> }
>>
>> public Read<T> withDatabase(String database) {
>> checkNotNull(database);
>> return toBuilder().setDatabase(database).build();
>> }
>>
>> public Read<T> withCoder(Coder<T> coder) {
>> checkArgument(coder != null,
>> "InfluxDbIO.read().withCoder(coder) called with null coder");
>> return toBuilder().setCoder(coder).build();
>> }
>>
>> @Override
>> public PCollection<T> expand(PBegin input) {
>>
>> PCollection<String> inflxDb =
>> input.apply(org.apache.beam.sdk.io.Read.from(new
>> BoundedInfluxDbSource(this)));
>>
>> System.out.println("***************Get InfluxDB
>> ***********");
>>
>> PCollection<String> output = inflxDb.apply("Print List",
>> ParDo.of(new DoFn<String, String>() {
>>
>> @ProcessElement
>> public void processElement(ProcessContext c) {
>> System.out.println("Print List: " +
>> c.element());
>> c.output(c.element());
>> }
>> }));
>>
>>
>> PCollection<T> poutput = output.apply(ParDo.of(new
>> DoFn<String, T>() {
>>
>> @ProcessElement
>> public void processElement(final ProcessContext c)
>> throws IOException {
>>
>> System.out.println("***************processElement***********");
>>
>> String entry = c.element();
>> getparser().parse(entry, new ParserCallback<T>() {
>> @Override
>> public void output(T output) {
>> c.output(output);
>> }
>> });
>> }
>>
>> }));
>>
>> System.out.println("***************Get InfluxDB Finished
>> ***********");
>>
>> return poutput;
>> }
>>
>> @Override
>> public void validate(PBegin input) {
>> checkNotNull(geturi(), "uri");
>> checkNotNull(getdatabase(), "database");
>> checkState(getCoder() != null,
>> "InfluxDBIO.read() requires a coder to be set via
>> withCoder(coder)");
>> }
>>
>> @Override
>> public void populateDisplayData(DisplayData.Builder builder) {
>> super.populateDisplayData(builder);
>> builder.add(DisplayData.item("uri", geturi()));
>> builder.add(DisplayData.item("database", getdatabase()));
>> builder.add(DisplayData.item("coder",
>> getCoder().getClass().getName()));
>> }
>>
>> /** A {@link DoFn} executing the SQL query to read from the
>> database. */
>> static class ReadFn<T> extends DoFn<String, T> {
>> private InfluxDbIO.Read<T> spec;
>>
>> private ReadFn(Read<T> spec) {
>> this.spec = spec;
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext context) throws
>> Exception {
>> System.out.println("InfluxDB entry: " +
>> context.element());
>> context.output((T) context.element()); // Rama: Need
>> to check the implementation
>> }
>> }
>> }
>>
>> private static class BoundedInfluxDbSource extends
>> BoundedSource<String> {
>> private Read spec;
>>
>> public BoundedInfluxDbSource(Read spec) {
>> this.spec = spec;
>> }
>>
>> @Override
>> public Coder<String> getDefaultOutputCoder() {
>> return SerializableCoder.of(String.class);
>> //StringUtf8Coder.of();
>> //SerializableCoder.of(String.class);
>> }
>>
>> @Override
>> public void validate() {
>> spec.validate(null);
>> }
>>
>> @Override
>> public void populateDisplayData(DisplayData.Builder builder) {
>> spec.populateDisplayData(builder);
>> }
>>
>> @Override
>> public BoundedReader<String> createReader(PipelineOptions
>> options) throws IOException {
>> return new BoundedInfluxDbReader(this);
>> }
>>
>> @Override
>> public long getEstimatedSizeBytes(PipelineOptions options)
>> throws Exception {
>> System.out.println("getEstimatedSizeBytes -->");
>> InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
>> influxDB.createDatabase(spec.getdatabase());
>> Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>> QueryResult queryResult = influxDB.query(query);
>>
>> List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>> int size = 0;
>> if(databaseNames != null) {
>> Iterator var4 = databaseNames.iterator();
>>
>> while(var4.hasNext()) {
>> List database = (List)var4.next();
>> size += database.size();
>> }
>> }
>>
>> return size;
>> }
>>
>> @Override
>> public List<? extends BoundedSource<String>>
>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
>> throws Exception {
>> List<BoundedSource<String>> sources = new
>> ArrayList<BoundedSource<String>>();
>>
>> InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
>> influxDB.createDatabase(spec.getdatabase());
>> Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>> QueryResult queryResult = influxDB.query(query);
>>
>> List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>> int size = 0;
>> if(databaseNames != null) {
>> Iterator var4 = databaseNames.iterator();
>>
>> while(var4.hasNext()) {
>> List database = (List)var4.next();
>> sources.add(this);// Need to check .. Rama
>> }
>> }
>>
>> checkArgument(!sources.isEmpty(), "No primary shard found");
>> return sources;
>> }
>>
>> }
>>
>> private static class BoundedInfluxDbReader extends
>> BoundedSource.BoundedReader<String> {
>> private final BoundedInfluxDbSource source;
>>
>> private Iterator cursor;
>> private List current;
>> private InfluxDB influxDB;
>>
>> public BoundedInfluxDbReader(BoundedInfluxDbSource source) {
>> this.source = source;
>> }
>>
>> @Override
>> public boolean start() throws IOException {
>> Read spec = source.spec;
>>
>> System.out.println("*******Query InfluxDb********* " +
>> spec.getdatabase() + " URI: " + spec.geturi());
>> influxDB = InfluxDBFactory.connect(spec.geturi());
>> influxDB.createDatabase(spec.getdatabase());
>> Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>> QueryResult queryResult = influxDB.query(query);
>>
>> List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>> if(databaseNames != null) {
>> cursor = databaseNames.iterator();
>> }
>>
>> return advance();
>> }
>>
>> @Override
>> public boolean advance() throws IOException {
>> if (cursor.hasNext()) {
>> current = (List) cursor.next();
>> return true;
>> } else {
>> return false;
>> }
>> }
>>
>> @Override
>> public BoundedSource<String> getCurrentSource() {
>> return source;
>> }
>>
>> @Override
>> public String getCurrent() throws NoSuchElementException {
>> return current.toString();
>> }
>>
>> @Override
>> public void close() throws IOException {
>> return;
>> // try {
>> // if (cursor != null) {
>> // cursor.close();
>> // }
>> // } catch (Exception e) {
>> // LOG.warn("Error closing MongoDB cursor", e);
>> // }
>> // try {
>> // client.close();
>> // } catch (Exception e) {
>> // LOG.warn("Error closing MongoDB client", e);
>> // }
>> }
>> }
>>
>> /** Write data to InfluxDB. */
>> public static Write write() {
>> return null; //new
>> AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build();
>> }
>>
>> /**
>> * A {@link PTransform} to write to a MongoDB database.
>> */
>> @AutoValue
>> public abstract static class Write extends
>> PTransform<PCollection<String>, PDone> {
>> @Nullable abstract String uri();
>> @Nullable abstract String database();
>> abstract long batchSize();
>>
>> abstract Builder toBuilder();
>>
>> @AutoValue.Builder
>> abstract static class Builder {
>> abstract Builder setUri(String uri);
>> abstract Builder setDatabase(String database);
>> abstract Builder setBatchSize(long batchSize);
>> abstract Write build();
>> }
>>
>> public Write withUri(String uri) {
>> return toBuilder().setUri(uri).build();
>> }
>>
>> public Write withDatabase(String database) {
>> return toBuilder().setDatabase(database).build();
>> }
>>
>> public Write withBatchSize(long batchSize) {
>> return toBuilder().setBatchSize(batchSize).build();
>> }
>>
>> @Override
>> public PDone expand(PCollection<String> input) {
>> input.apply(ParDo.of(new WriteFn(this)));
>> return PDone.in(input.getPipeline());
>> }
>>
>> @Override
>> public void validate(PCollection<String> input) {
>> checkNotNull(uri(), "uri");
>> checkNotNull(database(), "database");
>> checkNotNull(batchSize(), "batchSize");
>> }
>>
>> private static class WriteFn extends DoFn<String, Void> {
>> private final Write spec;
>> private InfluxDB influxDB;
>> private List<String> batch;
>> public WriteFn(Write spec) {
>> this.spec = spec;
>> }
>>
>> @Setup
>> public void createInfluxDb() throws Exception {
>> //client = new MongoClient(new
>> MongoClientURI(spec.uri()));
>> System.out.println("*******createInfluxDb********* "
>> + spec.database() + " URI: " + spec.uri());
>> if (influxDB == null) {
>> influxDB = InfluxDBFactory.connect(spec.uri());
>> influxDB.createDatabase(spec.database());
>> System.out.println("*******createDatabase: " +
>> spec.database());
>> }
>> }
>>
>> @StartBundle
>> public void startBundle(Context ctx) throws Exception {
>> batch = new ArrayList<String>();
>> }
>>
>> @ProcessElement
>> public void processElement(ProcessContext ctx) throws
>> Exception {
>> System.out.println("*******processElement: " +
>> ctx.element());
>>
>> BatchPoints batchPoint;
>> batchPoint = BatchPoints
>> .database(spec.database())
>> .tag("sflow", ctx.element())
>> .retentionPolicy("autogen")
>> .consistency(InfluxDB.ConsistencyLevel.ALL)
>> .build();
>>
>> Point point1 = Point.measurement(spec.database())
>> .time(System.currentTimeMillis(),
>> TimeUnit.MILLISECONDS)
>> .addField("sflow", ctx.element())
>> .build();
>> batchPoint.point(point1);
>>
>> influxDB.write(batchPoint);
>>
>> // influxDB.write(1, ctx.element());
>> // // Need to copy the document because
>> mongoCollection.insertMany() will mutate it
>> // // before inserting (will assign an id).
>> // batch.add(new String(ctx.element()));
>> // if (batch.size() >= spec.batchSize()) {
>> // finishBundle(ctx);
>> // }
>> }
>>
>> @FinishBundle
>> public void finishBundle(Context ctx) throws Exception {
>> // MongoDatabase mongoDatabase =
>> clinet.getDatabase(spec.database());
>> // MongoCollection<Document> mongoCollection =
>> mongoDatabase.getCollection(spec.collection());
>> //
>> // mongoCollection.insertMany(batch);
>>
>> batch.clear();
>> }
>>
>> @Teardown
>> public void closeInfluxClient() throws Exception {
>> //influxDB.deleteDatabase(spec.database());
>> //influxDB = null;
>>
>> /*
>> Query query = new Query("SELECT * FROM " +
>> spec.database(),spec.database());
>>
>> QueryResult queryResult = influxDB.query(query);
>>
>> List<QueryResult.Result> allresults =
>> queryResult.getResults();
>>
>> System.out.println(queryResult.getResults().get(0)
>> .getSeries().get(0).getValues());
>>
>> List temp = queryResult.getResults().get(0)
>> .getSeries().get(0).getValues();
>>
>> if(temp != null) {
>> Iterator var4 = temp.iterator();
>>
>> System.out.println("Start.....");
>> while(var4.hasNext()) {
>> List database = (List)var4.next();
>> System.out.println(database.toString());
>> }
>> }
>> //
>> // for(List<Object> ll : temp) {
>> // System.out.println(ll.get(0));
>> // System.out.println(ll.get(1));
>> // }
>> */
>>
>> System.out.println("*******deleteDatabase: " +
>> spec.database());
>> }
>> }
>> }
>> }
>>
>>
>>
>> On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> Hi,
>>>
>>> can you share your code ? I will fix that with you. I suggest to check
>>> the
>>> expand() method in the read PTransform and the way you use generic there.
>>>
>>> Any plan to donate this IO: I would be happy to review the PR !
>>>
>>> Do you leverage some InfluxDB feature (like splitting/sharding) ?
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:
>>>
>>> Hi Beam Dev,
>>>>
>>>> We have developed our own sdk io functions for read/write InfluxDBIO
>>>> operations in apache BEAM. it is works with default coder, which is
>>>> StringUtf8Coder.of().
>>>>
>>>> PCollection<String> output = pipeline.apply(
>>>> InfluxDbIO.<String>read()
>>>> .withUri("http://localhost:8086")
>>>> .withDatabase("beam"));
>>>>
>>>>
>>>>
>>>>
>>>> With reference mongoDB and JDBC, implemented the read function with
>>>> setcoder() options in InfluxDB also, but it is not working.
>>>>
>>>> PCollection<String> output = pipeline.apply(
>>>> InfluxDbIO.<String>read()
>>>> .withParser(new InfluxDbIO.Parser<String>() {
>>>> @Override
>>>> public void parse(String input,
>>>> InfluxDbIO.ParserCallback<Stri
>>>> ng>
>>>> callback) throws IOException {
>>>> callback.output(input);
>>>> }
>>>> })
>>>> .withUri("http://localhost:8086")
>>>> .withDatabase("beam")
>>>> .withCoder(StringUtf8Coder.of()));----> with
>>>> coder
>>>> getting error as
>>>>
>>>> java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot
>>>> be
>>>> cast to org.apache.beam.sdk.values.PCollection
>>>>
>>>> Thanks & Regards,
>>>> Ramanjaneya
>>>>
>>>>
>>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>