Hi JB,

Here is complete code for InfluxDB read/write methods, I have followed
generic method as other IO's.

package org.apache.beam.sdk.io.influxdb;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;

import org.apache.beam.sdk.values.PDone;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;

/**
 * Created by root1 on 16/5/17.
 */
public class InfluxDbIO {
    private static final Logger LOG = LoggerFactory.getLogger(InfluxDbIO.class);

    private InfluxDbIO() {
    }

    /**
     * Callback for the parser to use to submit data.
     */
    public interface ParserCallback<T> extends Serializable {
        /**
         * Output the object.  The default timestamp will be the GridFSDBFile
         * creation timestamp.
         * @param output
         */
        void output(T output);

    }

    /**
     * Interface for the parser that is used to parse the GridFSDBFile into
     * the appropriate types.
     * @param <T>
     */
    public interface Parser<T> extends Serializable {
        void parse(String input, ParserCallback<T> callback) throws IOException;
    }

    /**
     * For the default {@code Read<String>} case, this is the parser
that is used to
     * split the input file into Strings. It uses the timestamp of the file
     * for the event timestamp.
     */
    private static final Parser<String> TEXT_PARSER = new Parser<String>() {
        @Override
        public void parse(String input, ParserCallback<String> callback)
                throws IOException {
            callback.output(input);
        }
    };

    /**
     * Read data from a InfluxDb.
     *
     * @param <T> Type of the data to be read.
     */
    public static <T> Read<T> read() {
        return new AutoValue_InfluxDbIO_Read.Builder<T>().build();
    }

    /**
     * A {@link PTransform} to read data from InfluxDB.
     */
    @AutoValue
    public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
        @Nullable abstract Parser<T> getparser();
        @Nullable abstract String geturi();
        @Nullable abstract String getdatabase();
        @Nullable abstract Coder<T> getCoder();

        abstract Builder<T> toBuilder();

        @AutoValue.Builder
        abstract static class Builder<T> {
            abstract Builder<T> setParser(Parser<T> parser);
            abstract Builder<T> setUri(String uri);
            abstract Builder<T> setDatabase(String database);
            abstract Builder<T> setCoder(Coder<T> coder);
            abstract Read<T> build();
        }

        public <X> Read<X> withParser(Parser<X> parser) {
            checkNotNull(parser);
            Builder<X> builder = (Builder<X>) toBuilder();
            return builder.setParser(parser).setCoder(null).build();
        }

        /**
         * Example documentation for withUri.
         */
        public Read<T> withUri(String uri) {
            checkNotNull(uri);
            return toBuilder().setUri(uri).build();
        }

        public Read<T> withDatabase(String database) {
            checkNotNull(database);
            return toBuilder().setDatabase(database).build();
        }

        public Read<T> withCoder(Coder<T> coder) {
            checkArgument(coder != null,
"InfluxDbIO.read().withCoder(coder) called with null coder");
            return toBuilder().setCoder(coder).build();
        }

        @Override
        public PCollection<T> expand(PBegin input) {

            PCollection<String> inflxDb =
input.apply(org.apache.beam.sdk.io.Read.from(new
BoundedInfluxDbSource(this)));

            System.out.println("***************Get InfluxDB ***********");

            PCollection<String> output = inflxDb.apply("Print List",
                    ParDo.of(new DoFn<String, String>() {

                        @ProcessElement
                        public void processElement(ProcessContext c) {
                            System.out.println("Print List: " + c.element());
                            c.output(c.element());
                        }
                    }));


            PCollection<T> poutput = output.apply(ParDo.of(new
DoFn<String, T>() {

                @ProcessElement
                public void processElement(final ProcessContext c)
throws IOException {

System.out.println("***************processElement***********");

                    String entry = c.element();
                    getparser().parse(entry, new ParserCallback<T>() {
                        @Override
                        public void output(T output) {
                            c.output(output);
                        }
                    });
                }

            }));

            System.out.println("***************Get InfluxDB Finished
***********");

            return poutput;
        }

        @Override
        public void validate(PBegin input) {
            checkNotNull(geturi(), "uri");
            checkNotNull(getdatabase(), "database");
            checkState(getCoder() != null,
                    "InfluxDBIO.read() requires a coder to be set via
withCoder(coder)");
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            super.populateDisplayData(builder);
            builder.add(DisplayData.item("uri", geturi()));
            builder.add(DisplayData.item("database", getdatabase()));
            builder.add(DisplayData.item("coder",
getCoder().getClass().getName()));
        }

        /** A {@link DoFn} executing the SQL query to read from the database. */
        static class ReadFn<T> extends DoFn<String, T> {
            private InfluxDbIO.Read<T> spec;

            private ReadFn(Read<T> spec) {
                this.spec = spec;
            }

            @ProcessElement
            public void processElement(ProcessContext context) throws
Exception {
                System.out.println("InfluxDB entry: " + context.element());
                context.output((T) context.element()); // Rama: Need
to check the implementation
            }
        }
    }

    private static class BoundedInfluxDbSource extends BoundedSource<String> {
        private Read spec;

        public BoundedInfluxDbSource(Read spec) {
            this.spec = spec;
        }

        @Override
        public Coder<String> getDefaultOutputCoder() {
            return SerializableCoder.of(String.class);
            //StringUtf8Coder.of();
            //SerializableCoder.of(String.class);
        }

        @Override
        public void validate() {
            spec.validate(null);
        }

        @Override
        public void populateDisplayData(DisplayData.Builder builder) {
            spec.populateDisplayData(builder);
        }

        @Override
        public BoundedReader<String> createReader(PipelineOptions
options) throws IOException {
            return new BoundedInfluxDbReader(this);
        }

        @Override
        public long getEstimatedSizeBytes(PipelineOptions options)
throws Exception {
            System.out.println("getEstimatedSizeBytes -->");
            InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
            influxDB.createDatabase(spec.getdatabase());
            Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
            QueryResult queryResult = influxDB.query(query);

            List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

            int size = 0;
            if(databaseNames != null) {
                Iterator var4 = databaseNames.iterator();

                while(var4.hasNext()) {
                    List database = (List)var4.next();
                    size += database.size();
                }
            }

            return size;
        }

        @Override
        public List<? extends BoundedSource<String>>
splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
throws Exception {
            List<BoundedSource<String>> sources = new
ArrayList<BoundedSource<String>>();

            InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
            influxDB.createDatabase(spec.getdatabase());
            Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
            QueryResult queryResult = influxDB.query(query);

            List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

            int size = 0;
            if(databaseNames != null) {
                Iterator var4 = databaseNames.iterator();

                while(var4.hasNext()) {
                    List database = (List)var4.next();
                    sources.add(this);// Need to check .. Rama
                }
            }

            checkArgument(!sources.isEmpty(), "No primary shard found");
            return sources;
        }

    }

    private static class BoundedInfluxDbReader extends
BoundedSource.BoundedReader<String> {
        private final BoundedInfluxDbSource source;

        private Iterator cursor;
        private List current;
        private InfluxDB influxDB;

        public BoundedInfluxDbReader(BoundedInfluxDbSource source) {
            this.source = source;
        }

        @Override
        public boolean start() throws IOException {
            Read spec = source.spec;

            System.out.println("*******Query InfluxDb********* "   +
spec.getdatabase() + " URI: " + spec.geturi());
            influxDB = InfluxDBFactory.connect(spec.geturi());
            influxDB.createDatabase(spec.getdatabase());
            Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
            QueryResult queryResult = influxDB.query(query);

            List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();

            if(databaseNames != null) {
                cursor = databaseNames.iterator();
            }

            return advance();
        }

        @Override
        public boolean advance() throws IOException {
            if (cursor.hasNext()) {
                current = (List) cursor.next();
                return true;
            } else {
                return false;
            }
        }

        @Override
        public BoundedSource<String> getCurrentSource() {
            return source;
        }

        @Override
        public String getCurrent() throws NoSuchElementException {
            return current.toString();
        }

        @Override
        public void close() throws IOException {
            return;
//            try {
//                if (cursor != null) {
//                    cursor.close();
//                }
//            } catch (Exception e) {
//                LOG.warn("Error closing MongoDB cursor", e);
//            }
//            try {
//                client.close();
//            } catch (Exception e) {
//                LOG.warn("Error closing MongoDB client", e);
//            }
        }
    }

    /** Write data to InfluxDB. */
    public static Write write() {
        return null; //new
AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build();
    }

    /**
     * A {@link PTransform} to write to a MongoDB database.
     */
    @AutoValue
    public abstract static class Write extends
PTransform<PCollection<String>, PDone> {
        @Nullable abstract String uri();
        @Nullable abstract String database();
        abstract long batchSize();

        abstract Builder toBuilder();

        @AutoValue.Builder
        abstract static class Builder {
            abstract Builder setUri(String uri);
            abstract Builder setDatabase(String database);
            abstract Builder setBatchSize(long batchSize);
            abstract Write build();
        }

        public Write withUri(String uri) {
            return toBuilder().setUri(uri).build();
        }

        public Write withDatabase(String database) {
            return toBuilder().setDatabase(database).build();
        }

        public Write withBatchSize(long batchSize) {
            return toBuilder().setBatchSize(batchSize).build();
        }

        @Override
        public PDone expand(PCollection<String> input) {
            input.apply(ParDo.of(new WriteFn(this)));
            return PDone.in(input.getPipeline());
        }

        @Override
        public void validate(PCollection<String> input) {
            checkNotNull(uri(), "uri");
            checkNotNull(database(), "database");
            checkNotNull(batchSize(), "batchSize");
        }

        private static class WriteFn extends DoFn<String, Void> {
            private final Write spec;
            private InfluxDB influxDB;
            private List<String> batch;
            public WriteFn(Write spec) {
                this.spec = spec;
            }

            @Setup
            public void createInfluxDb() throws Exception {
                //client = new MongoClient(new MongoClientURI(spec.uri()));
                System.out.println("*******createInfluxDb********* "
                        + spec.database() + " URI: " + spec.uri());
                if (influxDB == null) {
                    influxDB = InfluxDBFactory.connect(spec.uri());
                    influxDB.createDatabase(spec.database());
                    System.out.println("*******createDatabase: " +
spec.database());
                }
            }

            @StartBundle
            public void startBundle(Context ctx) throws Exception {
                batch = new ArrayList<String>();
            }

            @ProcessElement
            public void processElement(ProcessContext ctx) throws Exception {
                System.out.println("*******processElement: " + ctx.element());

                BatchPoints batchPoint;
                batchPoint = BatchPoints
                        .database(spec.database())
                        .tag("sflow", ctx.element())
                        .retentionPolicy("autogen")
                        .consistency(InfluxDB.ConsistencyLevel.ALL)
                        .build();

                Point point1 = Point.measurement(spec.database())
                        .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
                        .addField("sflow", ctx.element())
                        .build();
                batchPoint.point(point1);

                influxDB.write(batchPoint);

                // influxDB.write(1, ctx.element());
//                // Need to copy the document because
mongoCollection.insertMany() will mutate it
//                // before inserting (will assign an id).
//                batch.add(new String(ctx.element()));
//                if (batch.size() >= spec.batchSize()) {
//                    finishBundle(ctx);
//                }
            }

            @FinishBundle
            public void finishBundle(Context ctx) throws Exception {
//                MongoDatabase mongoDatabase =
clinet.getDatabase(spec.database());
//                MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(spec.collection());
//
//                mongoCollection.insertMany(batch);

                batch.clear();
            }

            @Teardown
            public void closeInfluxClient() throws Exception {
                //influxDB.deleteDatabase(spec.database());
                //influxDB = null;

/*
                Query query = new Query("SELECT * FROM " +
spec.database(),spec.database());

                QueryResult queryResult = influxDB.query(query);

                List<QueryResult.Result> allresults = queryResult.getResults();

                System.out.println(queryResult.getResults().get(0)
                        .getSeries().get(0).getValues());

                List temp = queryResult.getResults().get(0)
                        .getSeries().get(0).getValues();

                if(temp != null) {
                    Iterator var4 = temp.iterator();

                    System.out.println("Start.....");
                    while(var4.hasNext()) {
                        List database = (List)var4.next();
                        System.out.println(database.toString());
                    }
                }
//
//                for(List<Object> ll : temp) {
//                    System.out.println(ll.get(0));
//                    System.out.println(ll.get(1));
//                }
         */

                System.out.println("*******deleteDatabase: " + spec.database());
            }
        }
    }
}



On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]>
wrote:

> Hi,
>
> can you share your code ? I will fix that with you. I suggest to check the
> expand() method in the read PTransform and the way you use generic there.
>
> Any plan to donate this IO: I would be happy to review the PR !
>
> Do you leverage some InfluxDB feature (like splitting/sharding) ?
>
> Regards
> JB
>
>
> On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:
>
>> Hi Beam Dev,
>>
>> We have developed our own sdk io  functions for read/write InfluxDBIO
>> operations in apache BEAM.  it is works with default coder, which is
>> StringUtf8Coder.of().
>>
>>   PCollection<String> output = pipeline.apply(
>>              InfluxDbIO.<String>read()
>>                      .withUri("http://localhost:8086";)
>>                      .withDatabase("beam"));
>>
>>
>>
>>
>> With reference mongoDB and JDBC, implemented the read function with
>> setcoder() options in InfluxDB also, but it is not working.
>>
>>      PCollection<String> output = pipeline.apply(
>>              InfluxDbIO.<String>read()
>>                      .withParser(new InfluxDbIO.Parser<String>() {
>>                        @Override
>>                        public void parse(String input,
>>                                        InfluxDbIO.ParserCallback<String>
>> callback) throws IOException {
>>                          callback.output(input);
>>                        }
>>                      })
>>                      .withUri("http://localhost:8086";)
>>                      .withDatabase("beam")
>>                      .withCoder(StringUtf8Coder.of()));----> with coder
>> getting error as
>>
>> java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
>> cast to org.apache.beam.sdk.values.PCollection
>>
>> Thanks & Regards,
>> Ramanjaneya
>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>

Reply via email to