I'm avialable @ Hangout

On Fri, Jun 30, 2017 at 11:25 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Thanks, let me paste in an editor and do a review.
>
> I keep you posted. By the way, are you on Slack or Hangout to chat quickly
> ?
>
> Regards
> JB
>
>
> On 06/30/2017 07:46 AM, P. Ramanjaneya Reddy wrote:
>
>> Hi JB,
>>
>> Here is complete code for InfluxDB read/write methods, I have followed
>> generic method as other IO's.
>>
>> package org.apache.beam.sdk.io.influxdb;
>>
>> import static com.google.common.base.Preconditions.checkArgument;
>> import static com.google.common.base.Preconditions.checkNotNull;
>> import static com.google.common.base.Preconditions.checkState;
>>
>> import com.google.auto.value.AutoValue;
>> import org.apache.beam.sdk.coders.Coder;
>> import org.apache.beam.sdk.coders.SerializableCoder;
>> import org.apache.beam.sdk.io.BoundedSource;
>> import org.apache.beam.sdk.options.PipelineOptions;
>> import org.apache.beam.sdk.transforms.DoFn;
>> import org.apache.beam.sdk.transforms.PTransform;
>> import org.apache.beam.sdk.transforms.ParDo;
>> import org.apache.beam.sdk.transforms.display.DisplayData;
>> import org.apache.beam.sdk.values.PBegin;
>> import org.apache.beam.sdk.values.PCollection;
>>
>> import org.apache.beam.sdk.values.PDone;
>> import org.influxdb.InfluxDB;
>> import org.influxdb.InfluxDBFactory;
>> import org.influxdb.dto.BatchPoints;
>> import org.influxdb.dto.Point;
>> import org.influxdb.dto.Query;
>> import org.influxdb.dto.QueryResult;
>> import org.slf4j.Logger;
>> import org.slf4j.LoggerFactory;
>>
>> import java.io.IOException;
>> import java.io.Serializable;
>> import java.util.ArrayList;
>> import java.util.Iterator;
>> import java.util.List;
>> import java.util.NoSuchElementException;
>> import java.util.concurrent.TimeUnit;
>> import javax.annotation.Nullable;
>>
>> /**
>>   * Created by root1 on 16/5/17.
>>   */
>> public class InfluxDbIO {
>>      private static final Logger LOG = LoggerFactory.getLogger(Influx
>> DbIO.class);
>>
>>      private InfluxDbIO() {
>>      }
>>
>>      /**
>>       * Callback for the parser to use to submit data.
>>       */
>>      public interface ParserCallback<T> extends Serializable {
>>          /**
>>           * Output the object.  The default timestamp will be the
>> GridFSDBFile
>>           * creation timestamp.
>>           * @param output
>>           */
>>          void output(T output);
>>
>>      }
>>
>>      /**
>>       * Interface for the parser that is used to parse the GridFSDBFile
>> into
>>       * the appropriate types.
>>       * @param <T>
>>       */
>>      public interface Parser<T> extends Serializable {
>>          void parse(String input, ParserCallback<T> callback) throws
>> IOException;
>>      }
>>
>>      /**
>>       * For the default {@code Read<String>} case, this is the parser
>> that is used to
>>       * split the input file into Strings. It uses the timestamp of the
>> file
>>       * for the event timestamp.
>>       */
>>      private static final Parser<String> TEXT_PARSER = new
>> Parser<String>() {
>>          @Override
>>          public void parse(String input, ParserCallback<String> callback)
>>                  throws IOException {
>>              callback.output(input);
>>          }
>>      };
>>
>>      /**
>>       * Read data from a InfluxDb.
>>       *
>>       * @param <T> Type of the data to be read.
>>       */
>>      public static <T> Read<T> read() {
>>          return new AutoValue_InfluxDbIO_Read.Builder<T>().build();
>>      }
>>
>>      /**
>>       * A {@link PTransform} to read data from InfluxDB.
>>       */
>>      @AutoValue
>>      public abstract static class Read<T> extends PTransform<PBegin,
>> PCollection<T>> {
>>          @Nullable abstract Parser<T> getparser();
>>          @Nullable abstract String geturi();
>>          @Nullable abstract String getdatabase();
>>          @Nullable abstract Coder<T> getCoder();
>>
>>          abstract Builder<T> toBuilder();
>>
>>          @AutoValue.Builder
>>          abstract static class Builder<T> {
>>              abstract Builder<T> setParser(Parser<T> parser);
>>              abstract Builder<T> setUri(String uri);
>>              abstract Builder<T> setDatabase(String database);
>>              abstract Builder<T> setCoder(Coder<T> coder);
>>              abstract Read<T> build();
>>          }
>>
>>          public <X> Read<X> withParser(Parser<X> parser) {
>>              checkNotNull(parser);
>>              Builder<X> builder = (Builder<X>) toBuilder();
>>              return builder.setParser(parser).setCoder(null).build();
>>          }
>>
>>          /**
>>           * Example documentation for withUri.
>>           */
>>          public Read<T> withUri(String uri) {
>>              checkNotNull(uri);
>>              return toBuilder().setUri(uri).build();
>>          }
>>
>>          public Read<T> withDatabase(String database) {
>>              checkNotNull(database);
>>              return toBuilder().setDatabase(database).build();
>>          }
>>
>>          public Read<T> withCoder(Coder<T> coder) {
>>              checkArgument(coder != null,
>> "InfluxDbIO.read().withCoder(coder) called with null coder");
>>              return toBuilder().setCoder(coder).build();
>>          }
>>
>>          @Override
>>          public PCollection<T> expand(PBegin input) {
>>
>>              PCollection<String> inflxDb =
>> input.apply(org.apache.beam.sdk.io.Read.from(new
>> BoundedInfluxDbSource(this)));
>>
>>              System.out.println("***************Get InfluxDB
>> ***********");
>>
>>              PCollection<String> output = inflxDb.apply("Print List",
>>                      ParDo.of(new DoFn<String, String>() {
>>
>>                          @ProcessElement
>>                          public void processElement(ProcessContext c) {
>>                              System.out.println("Print List: " +
>> c.element());
>>                              c.output(c.element());
>>                          }
>>                      }));
>>
>>
>>              PCollection<T> poutput = output.apply(ParDo.of(new
>> DoFn<String, T>() {
>>
>>                  @ProcessElement
>>                  public void processElement(final ProcessContext c)
>> throws IOException {
>>
>> System.out.println("***************processElement***********");
>>
>>                      String entry = c.element();
>>                      getparser().parse(entry, new ParserCallback<T>() {
>>                          @Override
>>                          public void output(T output) {
>>                              c.output(output);
>>                          }
>>                      });
>>                  }
>>
>>              }));
>>
>>              System.out.println("***************Get InfluxDB Finished
>> ***********");
>>
>>              return poutput;
>>          }
>>
>>          @Override
>>          public void validate(PBegin input) {
>>              checkNotNull(geturi(), "uri");
>>              checkNotNull(getdatabase(), "database");
>>              checkState(getCoder() != null,
>>                      "InfluxDBIO.read() requires a coder to be set via
>> withCoder(coder)");
>>          }
>>
>>          @Override
>>          public void populateDisplayData(DisplayData.Builder builder) {
>>              super.populateDisplayData(builder);
>>              builder.add(DisplayData.item("uri", geturi()));
>>              builder.add(DisplayData.item("database", getdatabase()));
>>              builder.add(DisplayData.item("coder",
>> getCoder().getClass().getName()));
>>          }
>>
>>          /** A {@link DoFn} executing the SQL query to read from the
>> database. */
>>          static class ReadFn<T> extends DoFn<String, T> {
>>              private InfluxDbIO.Read<T> spec;
>>
>>              private ReadFn(Read<T> spec) {
>>                  this.spec = spec;
>>              }
>>
>>              @ProcessElement
>>              public void processElement(ProcessContext context) throws
>> Exception {
>>                  System.out.println("InfluxDB entry: " +
>> context.element());
>>                  context.output((T) context.element()); // Rama: Need
>> to check the implementation
>>              }
>>          }
>>      }
>>
>>      private static class BoundedInfluxDbSource extends
>> BoundedSource<String> {
>>          private Read spec;
>>
>>          public BoundedInfluxDbSource(Read spec) {
>>              this.spec = spec;
>>          }
>>
>>          @Override
>>          public Coder<String> getDefaultOutputCoder() {
>>              return SerializableCoder.of(String.class);
>>              //StringUtf8Coder.of();
>>              //SerializableCoder.of(String.class);
>>          }
>>
>>          @Override
>>          public void validate() {
>>              spec.validate(null);
>>          }
>>
>>          @Override
>>          public void populateDisplayData(DisplayData.Builder builder) {
>>              spec.populateDisplayData(builder);
>>          }
>>
>>          @Override
>>          public BoundedReader<String> createReader(PipelineOptions
>> options) throws IOException {
>>              return new BoundedInfluxDbReader(this);
>>          }
>>
>>          @Override
>>          public long getEstimatedSizeBytes(PipelineOptions options)
>> throws Exception {
>>              System.out.println("getEstimatedSizeBytes -->");
>>              InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
>>              influxDB.createDatabase(spec.getdatabase());
>>              Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>>              QueryResult queryResult = influxDB.query(query);
>>
>>              List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>>              int size = 0;
>>              if(databaseNames != null) {
>>                  Iterator var4 = databaseNames.iterator();
>>
>>                  while(var4.hasNext()) {
>>                      List database = (List)var4.next();
>>                      size += database.size();
>>                  }
>>              }
>>
>>              return size;
>>          }
>>
>>          @Override
>>          public List<? extends BoundedSource<String>>
>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
>> throws Exception {
>>              List<BoundedSource<String>> sources = new
>> ArrayList<BoundedSource<String>>();
>>
>>              InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
>>              influxDB.createDatabase(spec.getdatabase());
>>              Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>>              QueryResult queryResult = influxDB.query(query);
>>
>>              List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>>              int size = 0;
>>              if(databaseNames != null) {
>>                  Iterator var4 = databaseNames.iterator();
>>
>>                  while(var4.hasNext()) {
>>                      List database = (List)var4.next();
>>                      sources.add(this);// Need to check .. Rama
>>                  }
>>              }
>>
>>              checkArgument(!sources.isEmpty(), "No primary shard found");
>>              return sources;
>>          }
>>
>>      }
>>
>>      private static class BoundedInfluxDbReader extends
>> BoundedSource.BoundedReader<String> {
>>          private final BoundedInfluxDbSource source;
>>
>>          private Iterator cursor;
>>          private List current;
>>          private InfluxDB influxDB;
>>
>>          public BoundedInfluxDbReader(BoundedInfluxDbSource source) {
>>              this.source = source;
>>          }
>>
>>          @Override
>>          public boolean start() throws IOException {
>>              Read spec = source.spec;
>>
>>              System.out.println("*******Query InfluxDb********* "   +
>> spec.getdatabase() + " URI: " + spec.geturi());
>>              influxDB = InfluxDBFactory.connect(spec.geturi());
>>              influxDB.createDatabase(spec.getdatabase());
>>              Query query = new Query("SELECT * FROM " +
>> spec.getdatabase(), spec.getdatabase());
>>              QueryResult queryResult = influxDB.query(query);
>>
>>              List databaseNames =
>> queryResult.getResults().get(0).getSeries().get(0).getValues();
>>
>>              if(databaseNames != null) {
>>                  cursor = databaseNames.iterator();
>>              }
>>
>>              return advance();
>>          }
>>
>>          @Override
>>          public boolean advance() throws IOException {
>>              if (cursor.hasNext()) {
>>                  current = (List) cursor.next();
>>                  return true;
>>              } else {
>>                  return false;
>>              }
>>          }
>>
>>          @Override
>>          public BoundedSource<String> getCurrentSource() {
>>              return source;
>>          }
>>
>>          @Override
>>          public String getCurrent() throws NoSuchElementException {
>>              return current.toString();
>>          }
>>
>>          @Override
>>          public void close() throws IOException {
>>              return;
>> //            try {
>> //                if (cursor != null) {
>> //                    cursor.close();
>> //                }
>> //            } catch (Exception e) {
>> //                LOG.warn("Error closing MongoDB cursor", e);
>> //            }
>> //            try {
>> //                client.close();
>> //            } catch (Exception e) {
>> //                LOG.warn("Error closing MongoDB client", e);
>> //            }
>>          }
>>      }
>>
>>      /** Write data to InfluxDB. */
>>      public static Write write() {
>>          return null; //new
>> AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build();
>>      }
>>
>>      /**
>>       * A {@link PTransform} to write to a MongoDB database.
>>       */
>>      @AutoValue
>>      public abstract static class Write extends
>> PTransform<PCollection<String>, PDone> {
>>          @Nullable abstract String uri();
>>          @Nullable abstract String database();
>>          abstract long batchSize();
>>
>>          abstract Builder toBuilder();
>>
>>          @AutoValue.Builder
>>          abstract static class Builder {
>>              abstract Builder setUri(String uri);
>>              abstract Builder setDatabase(String database);
>>              abstract Builder setBatchSize(long batchSize);
>>              abstract Write build();
>>          }
>>
>>          public Write withUri(String uri) {
>>              return toBuilder().setUri(uri).build();
>>          }
>>
>>          public Write withDatabase(String database) {
>>              return toBuilder().setDatabase(database).build();
>>          }
>>
>>          public Write withBatchSize(long batchSize) {
>>              return toBuilder().setBatchSize(batchSize).build();
>>          }
>>
>>          @Override
>>          public PDone expand(PCollection<String> input) {
>>              input.apply(ParDo.of(new WriteFn(this)));
>>              return PDone.in(input.getPipeline());
>>          }
>>
>>          @Override
>>          public void validate(PCollection<String> input) {
>>              checkNotNull(uri(), "uri");
>>              checkNotNull(database(), "database");
>>              checkNotNull(batchSize(), "batchSize");
>>          }
>>
>>          private static class WriteFn extends DoFn<String, Void> {
>>              private final Write spec;
>>              private InfluxDB influxDB;
>>              private List<String> batch;
>>              public WriteFn(Write spec) {
>>                  this.spec = spec;
>>              }
>>
>>              @Setup
>>              public void createInfluxDb() throws Exception {
>>                  //client = new MongoClient(new
>> MongoClientURI(spec.uri()));
>>                  System.out.println("*******createInfluxDb********* "
>>                          + spec.database() + " URI: " + spec.uri());
>>                  if (influxDB == null) {
>>                      influxDB = InfluxDBFactory.connect(spec.uri());
>>                      influxDB.createDatabase(spec.database());
>>                      System.out.println("*******createDatabase: " +
>> spec.database());
>>                  }
>>              }
>>
>>              @StartBundle
>>              public void startBundle(Context ctx) throws Exception {
>>                  batch = new ArrayList<String>();
>>              }
>>
>>              @ProcessElement
>>              public void processElement(ProcessContext ctx) throws
>> Exception {
>>                  System.out.println("*******processElement: " +
>> ctx.element());
>>
>>                  BatchPoints batchPoint;
>>                  batchPoint = BatchPoints
>>                          .database(spec.database())
>>                          .tag("sflow", ctx.element())
>>                          .retentionPolicy("autogen")
>>                          .consistency(InfluxDB.ConsistencyLevel.ALL)
>>                          .build();
>>
>>                  Point point1 = Point.measurement(spec.database())
>>                          .time(System.currentTimeMillis(),
>> TimeUnit.MILLISECONDS)
>>                          .addField("sflow", ctx.element())
>>                          .build();
>>                  batchPoint.point(point1);
>>
>>                  influxDB.write(batchPoint);
>>
>>                  // influxDB.write(1, ctx.element());
>> //                // Need to copy the document because
>> mongoCollection.insertMany() will mutate it
>> //                // before inserting (will assign an id).
>> //                batch.add(new String(ctx.element()));
>> //                if (batch.size() >= spec.batchSize()) {
>> //                    finishBundle(ctx);
>> //                }
>>              }
>>
>>              @FinishBundle
>>              public void finishBundle(Context ctx) throws Exception {
>> //                MongoDatabase mongoDatabase =
>> clinet.getDatabase(spec.database());
>> //                MongoCollection<Document> mongoCollection =
>> mongoDatabase.getCollection(spec.collection());
>> //
>> //                mongoCollection.insertMany(batch);
>>
>>                  batch.clear();
>>              }
>>
>>              @Teardown
>>              public void closeInfluxClient() throws Exception {
>>                  //influxDB.deleteDatabase(spec.database());
>>                  //influxDB = null;
>>
>> /*
>>                  Query query = new Query("SELECT * FROM " +
>> spec.database(),spec.database());
>>
>>                  QueryResult queryResult = influxDB.query(query);
>>
>>                  List<QueryResult.Result> allresults =
>> queryResult.getResults();
>>
>>                  System.out.println(queryResult.getResults().get(0)
>>                          .getSeries().get(0).getValues());
>>
>>                  List temp = queryResult.getResults().get(0)
>>                          .getSeries().get(0).getValues();
>>
>>                  if(temp != null) {
>>                      Iterator var4 = temp.iterator();
>>
>>                      System.out.println("Start.....");
>>                      while(var4.hasNext()) {
>>                          List database = (List)var4.next();
>>                          System.out.println(database.toString());
>>                      }
>>                  }
>> //
>> //                for(List<Object> ll : temp) {
>> //                    System.out.println(ll.get(0));
>> //                    System.out.println(ll.get(1));
>> //                }
>>           */
>>
>>                  System.out.println("*******deleteDatabase: " +
>> spec.database());
>>              }
>>          }
>>      }
>> }
>>
>>
>>
>> On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]>
>> wrote:
>>
>> Hi,
>>>
>>> can you share your code ? I will fix that with you. I suggest to check
>>> the
>>> expand() method in the read PTransform and the way you use generic there.
>>>
>>> Any plan to donate this IO: I would be happy to review the PR !
>>>
>>> Do you leverage some InfluxDB feature (like splitting/sharding) ?
>>>
>>> Regards
>>> JB
>>>
>>>
>>> On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:
>>>
>>> Hi Beam Dev,
>>>>
>>>> We have developed our own sdk io  functions for read/write InfluxDBIO
>>>> operations in apache BEAM.  it is works with default coder, which is
>>>> StringUtf8Coder.of().
>>>>
>>>>    PCollection<String> output = pipeline.apply(
>>>>               InfluxDbIO.<String>read()
>>>>                       .withUri("http://localhost:8086";)
>>>>                       .withDatabase("beam"));
>>>>
>>>>
>>>>
>>>>
>>>> With reference mongoDB and JDBC, implemented the read function with
>>>> setcoder() options in InfluxDB also, but it is not working.
>>>>
>>>>       PCollection<String> output = pipeline.apply(
>>>>               InfluxDbIO.<String>read()
>>>>                       .withParser(new InfluxDbIO.Parser<String>() {
>>>>                         @Override
>>>>                         public void parse(String input,
>>>>                                         InfluxDbIO.ParserCallback<Stri
>>>> ng>
>>>> callback) throws IOException {
>>>>                           callback.output(input);
>>>>                         }
>>>>                       })
>>>>                       .withUri("http://localhost:8086";)
>>>>                       .withDatabase("beam")
>>>>                       .withCoder(StringUtf8Coder.of()));----> with
>>>> coder
>>>> getting error as
>>>>
>>>> java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot
>>>> be
>>>> cast to org.apache.beam.sdk.values.PCollection
>>>>
>>>> Thanks & Regards,
>>>> Ramanjaneya
>>>>
>>>>
>>>> --
>>> Jean-Baptiste Onofré
>>> [email protected]
>>> http://blog.nanthrax.net
>>> Talend - http://www.talend.com
>>>
>>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to