Thanks, let me paste in an editor and do a review.

I keep you posted. By the way, are you on Slack or Hangout to chat quickly ?

Regards
JB

On 06/30/2017 07:46 AM, P. Ramanjaneya Reddy wrote:
Hi JB,

Here is complete code for InfluxDB read/write methods, I have followed
generic method as other IO's.

package org.apache.beam.sdk.io.influxdb;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PDone;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
  * Created by root1 on 16/5/17.
  */
public class InfluxDbIO {
     private static final Logger LOG = 
LoggerFactory.getLogger(InfluxDbIO.class);

     private InfluxDbIO() {
     }

     /**
      * Callback for the parser to use to submit data.
      */
     public interface ParserCallback<T> extends Serializable {
         /**
          * Output the object.  The default timestamp will be the GridFSDBFile
          * creation timestamp.
          * @param output
          */
         void output(T output);

     }

     /**
      * Interface for the parser that is used to parse the GridFSDBFile into
      * the appropriate types.
      * @param <T>
      */
     public interface Parser<T> extends Serializable {
         void parse(String input, ParserCallback<T> callback) throws 
IOException;
     }

     /**
      * For the default {@code Read<String>} case, this is the parser
that is used to
      * split the input file into Strings. It uses the timestamp of the file
      * for the event timestamp.
      */
     private static final Parser<String> TEXT_PARSER = new Parser<String>() {
         @Override
         public void parse(String input, ParserCallback<String> callback)
                 throws IOException {
             callback.output(input);
         }
     };

     /**
      * Read data from a InfluxDb.
      *
      * @param <T> Type of the data to be read.
      */
     public static <T> Read<T> read() {
         return new AutoValue_InfluxDbIO_Read.Builder<T>().build();
     }

     /**
      * A {@link PTransform} to read data from InfluxDB.
      */
     @AutoValue
     public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
         @Nullable abstract Parser<T> getparser();
         @Nullable abstract String geturi();
         @Nullable abstract String getdatabase();
         @Nullable abstract Coder<T> getCoder();

         abstract Builder<T> toBuilder();

         @AutoValue.Builder
         abstract static class Builder<T> {
             abstract Builder<T> setParser(Parser<T> parser);
             abstract Builder<T> setUri(String uri);
             abstract Builder<T> setDatabase(String database);
             abstract Builder<T> setCoder(Coder<T> coder);
             abstract Read<T> build();
         }

         public <X> Read<X> withParser(Parser<X> parser) {
             checkNotNull(parser);
             Builder<X> builder = (Builder<X>) toBuilder();
             return builder.setParser(parser).setCoder(null).build();
         }

         /**
          * Example documentation for withUri.
          */
         public Read<T> withUri(String uri) {
             checkNotNull(uri);
             return toBuilder().setUri(uri).build();
         }

         public Read<T> withDatabase(String database) {
             checkNotNull(database);
             return toBuilder().setDatabase(database).build();
         }

         public Read<T> withCoder(Coder<T> coder) {
             checkArgument(coder != null,
"InfluxDbIO.read().withCoder(coder) called with null coder");
             return toBuilder().setCoder(coder).build();
         }

         @Override
         public PCollection<T> expand(PBegin input) {

             PCollection<String> inflxDb =
input.apply(org.apache.beam.sdk.io.Read.from(new
BoundedInfluxDbSource(this)));

             System.out.println("***************Get InfluxDB ***********");

             PCollection<String> output = inflxDb.apply("Print List",
                     ParDo.of(new DoFn<String, String>() {

                         @ProcessElement
                         public void processElement(ProcessContext c) {
                             System.out.println("Print List: " + c.element());
                             c.output(c.element());
                         }
                     }));


             PCollection<T> poutput = output.apply(ParDo.of(new
DoFn<String, T>() {

                 @ProcessElement
                 public void processElement(final ProcessContext c)
throws IOException {

System.out.println("***************processElement***********");

                     String entry = c.element();
                     getparser().parse(entry, new ParserCallback<T>() {
                         @Override
                         public void output(T output) {
                             c.output(output);
                         }
                     });
                 }

             }));

             System.out.println("***************Get InfluxDB Finished
***********");

             return poutput;
         }

         @Override
         public void validate(PBegin input) {
             checkNotNull(geturi(), "uri");
             checkNotNull(getdatabase(), "database");
             checkState(getCoder() != null,
                     "InfluxDBIO.read() requires a coder to be set via
withCoder(coder)");
         }

         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
             super.populateDisplayData(builder);
             builder.add(DisplayData.item("uri", geturi()));
             builder.add(DisplayData.item("database", getdatabase()));
             builder.add(DisplayData.item("coder",
getCoder().getClass().getName()));
         }

         /** A {@link DoFn} executing the SQL query to read from the database. 
*/
         static class ReadFn<T> extends DoFn<String, T> {
             private InfluxDbIO.Read<T> spec;

             private ReadFn(Read<T> spec) {
                 this.spec = spec;
             }

             @ProcessElement
             public void processElement(ProcessContext context) throws
Exception {
                 System.out.println("InfluxDB entry: " + context.element());
                 context.output((T) context.element()); // Rama: Need
to check the implementation
             }
         }
     }

     private static class BoundedInfluxDbSource extends BoundedSource<String> {
         private Read spec;

         public BoundedInfluxDbSource(Read spec) {
             this.spec = spec;
         }

         @Override
         public Coder<String> getDefaultOutputCoder() {
             return SerializableCoder.of(String.class);
             //StringUtf8Coder.of();
             //SerializableCoder.of(String.class);
         }

         @Override
         public void validate() {
             spec.validate(null);
         }

         @Override
         public void populateDisplayData(DisplayData.Builder builder) {
             spec.populateDisplayData(builder);
         }

         @Override
         public BoundedReader<String> createReader(PipelineOptions
options) throws IOException {
             return new BoundedInfluxDbReader(this);
         }

         @Override
         public long getEstimatedSizeBytes(PipelineOptions options)
throws Exception {
             System.out.println("getEstimatedSizeBytes -->");
             InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
             influxDB.createDatabase(spec.getdatabase());
             Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
             QueryResult queryResult = influxDB.query(query);

             List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

             int size = 0;
             if(databaseNames != null) {
                 Iterator var4 = databaseNames.iterator();

                 while(var4.hasNext()) {
                     List database = (List)var4.next();
                     size += database.size();
                 }
             }

             return size;
         }

         @Override
         public List<? extends BoundedSource<String>>
splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
throws Exception {
             List<BoundedSource<String>> sources = new
ArrayList<BoundedSource<String>>();

             InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
             influxDB.createDatabase(spec.getdatabase());
             Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
             QueryResult queryResult = influxDB.query(query);

             List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

             int size = 0;
             if(databaseNames != null) {
                 Iterator var4 = databaseNames.iterator();

                 while(var4.hasNext()) {
                     List database = (List)var4.next();
                     sources.add(this);// Need to check .. Rama
                 }
             }

             checkArgument(!sources.isEmpty(), "No primary shard found");
             return sources;
         }

     }

     private static class BoundedInfluxDbReader extends
BoundedSource.BoundedReader<String> {
         private final BoundedInfluxDbSource source;

         private Iterator cursor;
         private List current;
         private InfluxDB influxDB;

         public BoundedInfluxDbReader(BoundedInfluxDbSource source) {
             this.source = source;
         }

         @Override
         public boolean start() throws IOException {
             Read spec = source.spec;

             System.out.println("*******Query InfluxDb********* "   +
spec.getdatabase() + " URI: " + spec.geturi());
             influxDB = InfluxDBFactory.connect(spec.geturi());
             influxDB.createDatabase(spec.getdatabase());
             Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
             QueryResult queryResult = influxDB.query(query);

             List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

             if(databaseNames != null) {
                 cursor = databaseNames.iterator();
             }

             return advance();
         }

         @Override
         public boolean advance() throws IOException {
             if (cursor.hasNext()) {
                 current = (List) cursor.next();
                 return true;
             } else {
                 return false;
             }
         }

         @Override
         public BoundedSource<String> getCurrentSource() {
             return source;
         }

         @Override
         public String getCurrent() throws NoSuchElementException {
             return current.toString();
         }

         @Override
         public void close() throws IOException {
             return;
//            try {
//                if (cursor != null) {
//                    cursor.close();
//                }
//            } catch (Exception e) {
//                LOG.warn("Error closing MongoDB cursor", e);
//            }
//            try {
//                client.close();
//            } catch (Exception e) {
//                LOG.warn("Error closing MongoDB client", e);
//            }
         }
     }

     /** Write data to InfluxDB. */
     public static Write write() {
         return null; //new
AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build();
     }

     /**
      * A {@link PTransform} to write to a MongoDB database.
      */
     @AutoValue
     public abstract static class Write extends
PTransform<PCollection<String>, PDone> {
         @Nullable abstract String uri();
         @Nullable abstract String database();
         abstract long batchSize();

         abstract Builder toBuilder();

         @AutoValue.Builder
         abstract static class Builder {
             abstract Builder setUri(String uri);
             abstract Builder setDatabase(String database);
             abstract Builder setBatchSize(long batchSize);
             abstract Write build();
         }

         public Write withUri(String uri) {
             return toBuilder().setUri(uri).build();
         }

         public Write withDatabase(String database) {
             return toBuilder().setDatabase(database).build();
         }

         public Write withBatchSize(long batchSize) {
             return toBuilder().setBatchSize(batchSize).build();
         }

         @Override
         public PDone expand(PCollection<String> input) {
             input.apply(ParDo.of(new WriteFn(this)));
             return PDone.in(input.getPipeline());
         }

         @Override
         public void validate(PCollection<String> input) {
             checkNotNull(uri(), "uri");
             checkNotNull(database(), "database");
             checkNotNull(batchSize(), "batchSize");
         }

         private static class WriteFn extends DoFn<String, Void> {
             private final Write spec;
             private InfluxDB influxDB;
             private List<String> batch;
             public WriteFn(Write spec) {
                 this.spec = spec;
             }

             @Setup
             public void createInfluxDb() throws Exception {
                 //client = new MongoClient(new MongoClientURI(spec.uri()));
                 System.out.println("*******createInfluxDb********* "
                         + spec.database() + " URI: " + spec.uri());
                 if (influxDB == null) {
                     influxDB = InfluxDBFactory.connect(spec.uri());
                     influxDB.createDatabase(spec.database());
                     System.out.println("*******createDatabase: " +
spec.database());
                 }
             }

             @StartBundle
             public void startBundle(Context ctx) throws Exception {
                 batch = new ArrayList<String>();
             }

             @ProcessElement
             public void processElement(ProcessContext ctx) throws Exception {
                 System.out.println("*******processElement: " + ctx.element());

                 BatchPoints batchPoint;
                 batchPoint = BatchPoints
                         .database(spec.database())
                         .tag("sflow", ctx.element())
                         .retentionPolicy("autogen")
                         .consistency(InfluxDB.ConsistencyLevel.ALL)
                         .build();

                 Point point1 = Point.measurement(spec.database())
                         .time(System.currentTimeMillis(), 
TimeUnit.MILLISECONDS)
                         .addField("sflow", ctx.element())
                         .build();
                 batchPoint.point(point1);

                 influxDB.write(batchPoint);

                 // influxDB.write(1, ctx.element());
//                // Need to copy the document because
mongoCollection.insertMany() will mutate it
//                // before inserting (will assign an id).
//                batch.add(new String(ctx.element()));
//                if (batch.size() >= spec.batchSize()) {
//                    finishBundle(ctx);
//                }
             }

             @FinishBundle
             public void finishBundle(Context ctx) throws Exception {
//                MongoDatabase mongoDatabase =
clinet.getDatabase(spec.database());
//                MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(spec.collection());
//
//                mongoCollection.insertMany(batch);

                 batch.clear();
             }

             @Teardown
             public void closeInfluxClient() throws Exception {
                 //influxDB.deleteDatabase(spec.database());
                 //influxDB = null;

/*
                 Query query = new Query("SELECT * FROM " +
spec.database(),spec.database());

                 QueryResult queryResult = influxDB.query(query);

                 List<QueryResult.Result> allresults = queryResult.getResults();

                 System.out.println(queryResult.getResults().get(0)
                         .getSeries().get(0).getValues());

                 List temp = queryResult.getResults().get(0)
                         .getSeries().get(0).getValues();

                 if(temp != null) {
                     Iterator var4 = temp.iterator();

                     System.out.println("Start.....");
                     while(var4.hasNext()) {
                         List database = (List)var4.next();
                         System.out.println(database.toString());
                     }
                 }
//
//                for(List<Object> ll : temp) {
//                    System.out.println(ll.get(0));
//                    System.out.println(ll.get(1));
//                }
          */

                 System.out.println("*******deleteDatabase: " + 
spec.database());
             }
         }
     }
}



On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

Hi,

can you share your code ? I will fix that with you. I suggest to check the
expand() method in the read PTransform and the way you use generic there.

Any plan to donate this IO: I would be happy to review the PR !

Do you leverage some InfluxDB feature (like splitting/sharding) ?

Regards
JB


On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:

Hi Beam Dev,

We have developed our own sdk io  functions for read/write InfluxDBIO
operations in apache BEAM.  it is works with default coder, which is
StringUtf8Coder.of().

   PCollection<String> output = pipeline.apply(
              InfluxDbIO.<String>read()
                      .withUri("http://localhost:8086";)
                      .withDatabase("beam"));




With reference mongoDB and JDBC, implemented the read function with
setcoder() options in InfluxDB also, but it is not working.

      PCollection<String> output = pipeline.apply(
              InfluxDbIO.<String>read()
                      .withParser(new InfluxDbIO.Parser<String>() {
                        @Override
                        public void parse(String input,
                                        InfluxDbIO.ParserCallback<String>
callback) throws IOException {
                          callback.output(input);
                        }
                      })
                      .withUri("http://localhost:8086";)
                      .withDatabase("beam")
                      .withCoder(StringUtf8Coder.of()));----> with coder
getting error as

java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
cast to org.apache.beam.sdk.values.PCollection

Thanks & Regards,
Ramanjaneya


--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com



--
Jean-Baptiste Onofré
[email protected]
http://blog.nanthrax.net
Talend - http://www.talend.com

Reply via email to