Thanks, let me paste in an editor and do a review. I keep you posted. By the way, are you on Slack or Hangout to chat quickly ?
Regards JB On 06/30/2017 07:46 AM, P. Ramanjaneya Reddy wrote:
Hi JB, Here is complete code for InfluxDB read/write methods, I have followed generic method as other IO's. package org.apache.beam.sdk.io.influxdb; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.SerializableCoder; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PDone; import org.influxdb.InfluxDB; import org.influxdb.InfluxDBFactory; import org.influxdb.dto.BatchPoints; import org.influxdb.dto.Point; import org.influxdb.dto.Query; import org.influxdb.dto.QueryResult; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; import java.util.Iterator; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; /** * Created by root1 on 16/5/17. */ public class InfluxDbIO { private static final Logger LOG = LoggerFactory.getLogger(InfluxDbIO.class); private InfluxDbIO() { } /** * Callback for the parser to use to submit data. */ public interface ParserCallback<T> extends Serializable { /** * Output the object. The default timestamp will be the GridFSDBFile * creation timestamp. * @param output */ void output(T output); } /** * Interface for the parser that is used to parse the GridFSDBFile into * the appropriate types. * @param <T> */ public interface Parser<T> extends Serializable { void parse(String input, ParserCallback<T> callback) throws IOException; } /** * For the default {@code Read<String>} case, this is the parser that is used to * split the input file into Strings. It uses the timestamp of the file * for the event timestamp. */ private static final Parser<String> TEXT_PARSER = new Parser<String>() { @Override public void parse(String input, ParserCallback<String> callback) throws IOException { callback.output(input); } }; /** * Read data from a InfluxDb. * * @param <T> Type of the data to be read. */ public static <T> Read<T> read() { return new AutoValue_InfluxDbIO_Read.Builder<T>().build(); } /** * A {@link PTransform} to read data from InfluxDB. */ @AutoValue public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>> { @Nullable abstract Parser<T> getparser(); @Nullable abstract String geturi(); @Nullable abstract String getdatabase(); @Nullable abstract Coder<T> getCoder(); abstract Builder<T> toBuilder(); @AutoValue.Builder abstract static class Builder<T> { abstract Builder<T> setParser(Parser<T> parser); abstract Builder<T> setUri(String uri); abstract Builder<T> setDatabase(String database); abstract Builder<T> setCoder(Coder<T> coder); abstract Read<T> build(); } public <X> Read<X> withParser(Parser<X> parser) { checkNotNull(parser); Builder<X> builder = (Builder<X>) toBuilder(); return builder.setParser(parser).setCoder(null).build(); } /** * Example documentation for withUri. */ public Read<T> withUri(String uri) { checkNotNull(uri); return toBuilder().setUri(uri).build(); } public Read<T> withDatabase(String database) { checkNotNull(database); return toBuilder().setDatabase(database).build(); } public Read<T> withCoder(Coder<T> coder) { checkArgument(coder != null, "InfluxDbIO.read().withCoder(coder) called with null coder"); return toBuilder().setCoder(coder).build(); } @Override public PCollection<T> expand(PBegin input) { PCollection<String> inflxDb = input.apply(org.apache.beam.sdk.io.Read.from(new BoundedInfluxDbSource(this))); System.out.println("***************Get InfluxDB ***********"); PCollection<String> output = inflxDb.apply("Print List", ParDo.of(new DoFn<String, String>() { @ProcessElement public void processElement(ProcessContext c) { System.out.println("Print List: " + c.element()); c.output(c.element()); } })); PCollection<T> poutput = output.apply(ParDo.of(new DoFn<String, T>() { @ProcessElement public void processElement(final ProcessContext c) throws IOException { System.out.println("***************processElement***********"); String entry = c.element(); getparser().parse(entry, new ParserCallback<T>() { @Override public void output(T output) { c.output(output); } }); } })); System.out.println("***************Get InfluxDB Finished ***********"); return poutput; } @Override public void validate(PBegin input) { checkNotNull(geturi(), "uri"); checkNotNull(getdatabase(), "database"); checkState(getCoder() != null, "InfluxDBIO.read() requires a coder to be set via withCoder(coder)"); } @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.add(DisplayData.item("uri", geturi())); builder.add(DisplayData.item("database", getdatabase())); builder.add(DisplayData.item("coder", getCoder().getClass().getName())); } /** A {@link DoFn} executing the SQL query to read from the database. */ static class ReadFn<T> extends DoFn<String, T> { private InfluxDbIO.Read<T> spec; private ReadFn(Read<T> spec) { this.spec = spec; } @ProcessElement public void processElement(ProcessContext context) throws Exception { System.out.println("InfluxDB entry: " + context.element()); context.output((T) context.element()); // Rama: Need to check the implementation } } } private static class BoundedInfluxDbSource extends BoundedSource<String> { private Read spec; public BoundedInfluxDbSource(Read spec) { this.spec = spec; } @Override public Coder<String> getDefaultOutputCoder() { return SerializableCoder.of(String.class); //StringUtf8Coder.of(); //SerializableCoder.of(String.class); } @Override public void validate() { spec.validate(null); } @Override public void populateDisplayData(DisplayData.Builder builder) { spec.populateDisplayData(builder); } @Override public BoundedReader<String> createReader(PipelineOptions options) throws IOException { return new BoundedInfluxDbReader(this); } @Override public long getEstimatedSizeBytes(PipelineOptions options) throws Exception { System.out.println("getEstimatedSizeBytes -->"); InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi()); influxDB.createDatabase(spec.getdatabase()); Query query = new Query("SELECT * FROM " + spec.getdatabase(), spec.getdatabase()); QueryResult queryResult = influxDB.query(query); List databaseNames = queryResult.getResults().get(0).getSeries().get(0).getValues(); int size = 0; if(databaseNames != null) { Iterator var4 = databaseNames.iterator(); while(var4.hasNext()) { List database = (List)var4.next(); size += database.size(); } } return size; } @Override public List<? extends BoundedSource<String>> splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options) throws Exception { List<BoundedSource<String>> sources = new ArrayList<BoundedSource<String>>(); InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi()); influxDB.createDatabase(spec.getdatabase()); Query query = new Query("SELECT * FROM " + spec.getdatabase(), spec.getdatabase()); QueryResult queryResult = influxDB.query(query); List databaseNames = queryResult.getResults().get(0).getSeries().get(0).getValues(); int size = 0; if(databaseNames != null) { Iterator var4 = databaseNames.iterator(); while(var4.hasNext()) { List database = (List)var4.next(); sources.add(this);// Need to check .. Rama } } checkArgument(!sources.isEmpty(), "No primary shard found"); return sources; } } private static class BoundedInfluxDbReader extends BoundedSource.BoundedReader<String> { private final BoundedInfluxDbSource source; private Iterator cursor; private List current; private InfluxDB influxDB; public BoundedInfluxDbReader(BoundedInfluxDbSource source) { this.source = source; } @Override public boolean start() throws IOException { Read spec = source.spec; System.out.println("*******Query InfluxDb********* " + spec.getdatabase() + " URI: " + spec.geturi()); influxDB = InfluxDBFactory.connect(spec.geturi()); influxDB.createDatabase(spec.getdatabase()); Query query = new Query("SELECT * FROM " + spec.getdatabase(), spec.getdatabase()); QueryResult queryResult = influxDB.query(query); List databaseNames = queryResult.getResults().get(0).getSeries().get(0).getValues(); if(databaseNames != null) { cursor = databaseNames.iterator(); } return advance(); } @Override public boolean advance() throws IOException { if (cursor.hasNext()) { current = (List) cursor.next(); return true; } else { return false; } } @Override public BoundedSource<String> getCurrentSource() { return source; } @Override public String getCurrent() throws NoSuchElementException { return current.toString(); } @Override public void close() throws IOException { return; // try { // if (cursor != null) { // cursor.close(); // } // } catch (Exception e) { // LOG.warn("Error closing MongoDB cursor", e); // } // try { // client.close(); // } catch (Exception e) { // LOG.warn("Error closing MongoDB client", e); // } } } /** Write data to InfluxDB. */ public static Write write() { return null; //new AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build(); } /** * A {@link PTransform} to write to a MongoDB database. */ @AutoValue public abstract static class Write extends PTransform<PCollection<String>, PDone> { @Nullable abstract String uri(); @Nullable abstract String database(); abstract long batchSize(); abstract Builder toBuilder(); @AutoValue.Builder abstract static class Builder { abstract Builder setUri(String uri); abstract Builder setDatabase(String database); abstract Builder setBatchSize(long batchSize); abstract Write build(); } public Write withUri(String uri) { return toBuilder().setUri(uri).build(); } public Write withDatabase(String database) { return toBuilder().setDatabase(database).build(); } public Write withBatchSize(long batchSize) { return toBuilder().setBatchSize(batchSize).build(); } @Override public PDone expand(PCollection<String> input) { input.apply(ParDo.of(new WriteFn(this))); return PDone.in(input.getPipeline()); } @Override public void validate(PCollection<String> input) { checkNotNull(uri(), "uri"); checkNotNull(database(), "database"); checkNotNull(batchSize(), "batchSize"); } private static class WriteFn extends DoFn<String, Void> { private final Write spec; private InfluxDB influxDB; private List<String> batch; public WriteFn(Write spec) { this.spec = spec; } @Setup public void createInfluxDb() throws Exception { //client = new MongoClient(new MongoClientURI(spec.uri())); System.out.println("*******createInfluxDb********* " + spec.database() + " URI: " + spec.uri()); if (influxDB == null) { influxDB = InfluxDBFactory.connect(spec.uri()); influxDB.createDatabase(spec.database()); System.out.println("*******createDatabase: " + spec.database()); } } @StartBundle public void startBundle(Context ctx) throws Exception { batch = new ArrayList<String>(); } @ProcessElement public void processElement(ProcessContext ctx) throws Exception { System.out.println("*******processElement: " + ctx.element()); BatchPoints batchPoint; batchPoint = BatchPoints .database(spec.database()) .tag("sflow", ctx.element()) .retentionPolicy("autogen") .consistency(InfluxDB.ConsistencyLevel.ALL) .build(); Point point1 = Point.measurement(spec.database()) .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS) .addField("sflow", ctx.element()) .build(); batchPoint.point(point1); influxDB.write(batchPoint); // influxDB.write(1, ctx.element()); // // Need to copy the document because mongoCollection.insertMany() will mutate it // // before inserting (will assign an id). // batch.add(new String(ctx.element())); // if (batch.size() >= spec.batchSize()) { // finishBundle(ctx); // } } @FinishBundle public void finishBundle(Context ctx) throws Exception { // MongoDatabase mongoDatabase = clinet.getDatabase(spec.database()); // MongoCollection<Document> mongoCollection = mongoDatabase.getCollection(spec.collection()); // // mongoCollection.insertMany(batch); batch.clear(); } @Teardown public void closeInfluxClient() throws Exception { //influxDB.deleteDatabase(spec.database()); //influxDB = null; /* Query query = new Query("SELECT * FROM " + spec.database(),spec.database()); QueryResult queryResult = influxDB.query(query); List<QueryResult.Result> allresults = queryResult.getResults(); System.out.println(queryResult.getResults().get(0) .getSeries().get(0).getValues()); List temp = queryResult.getResults().get(0) .getSeries().get(0).getValues(); if(temp != null) { Iterator var4 = temp.iterator(); System.out.println("Start....."); while(var4.hasNext()) { List database = (List)var4.next(); System.out.println(database.toString()); } } // // for(List<Object> ll : temp) { // System.out.println(ll.get(0)); // System.out.println(ll.get(1)); // } */ System.out.println("*******deleteDatabase: " + spec.database()); } } } } On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]> wrote:Hi, can you share your code ? I will fix that with you. I suggest to check the expand() method in the read PTransform and the way you use generic there. Any plan to donate this IO: I would be happy to review the PR ! Do you leverage some InfluxDB feature (like splitting/sharding) ? Regards JB On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:Hi Beam Dev, We have developed our own sdk io functions for read/write InfluxDBIO operations in apache BEAM. it is works with default coder, which is StringUtf8Coder.of(). PCollection<String> output = pipeline.apply( InfluxDbIO.<String>read() .withUri("http://localhost:8086") .withDatabase("beam")); With reference mongoDB and JDBC, implemented the read function with setcoder() options in InfluxDB also, but it is not working. PCollection<String> output = pipeline.apply( InfluxDbIO.<String>read() .withParser(new InfluxDbIO.Parser<String>() { @Override public void parse(String input, InfluxDbIO.ParserCallback<String> callback) throws IOException { callback.output(input); } }) .withUri("http://localhost:8086") .withDatabase("beam") .withCoder(StringUtf8Coder.of()));----> with coder getting error as java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be cast to org.apache.beam.sdk.values.PCollection Thanks & Regards, Ramanjaneya-- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com
-- Jean-Baptiste Onofré [email protected] http://blog.nanthrax.net Talend - http://www.talend.com
