Hi JB,
Here is complete code for InfluxDB read/write methods, I have followed
generic method as other IO's.
package org.apache.beam.sdk.io.influxdb;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
import com.google.auto.value.AutoValue;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.influxdb.InfluxDB;
import org.influxdb.InfluxDBFactory;
import org.influxdb.dto.BatchPoints;
import org.influxdb.dto.Point;
import org.influxdb.dto.Query;
import org.influxdb.dto.QueryResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
/**
* Created by root1 on 16/5/17.
*/
public class InfluxDbIO {
private static final Logger LOG = LoggerFactory.getLogger(InfluxDbIO.class);
private InfluxDbIO() {
}
/**
* Callback for the parser to use to submit data.
*/
public interface ParserCallback<T> extends Serializable {
/**
* Output the object. The default timestamp will be the GridFSDBFile
* creation timestamp.
* @param output
*/
void output(T output);
}
/**
* Interface for the parser that is used to parse the GridFSDBFile into
* the appropriate types.
* @param <T>
*/
public interface Parser<T> extends Serializable {
void parse(String input, ParserCallback<T> callback) throws IOException;
}
/**
* For the default {@code Read<String>} case, this is the parser
that is used to
* split the input file into Strings. It uses the timestamp of the file
* for the event timestamp.
*/
private static final Parser<String> TEXT_PARSER = new Parser<String>() {
@Override
public void parse(String input, ParserCallback<String> callback)
throws IOException {
callback.output(input);
}
};
/**
* Read data from a InfluxDb.
*
* @param <T> Type of the data to be read.
*/
public static <T> Read<T> read() {
return new AutoValue_InfluxDbIO_Read.Builder<T>().build();
}
/**
* A {@link PTransform} to read data from InfluxDB.
*/
@AutoValue
public abstract static class Read<T> extends PTransform<PBegin,
PCollection<T>> {
@Nullable abstract Parser<T> getparser();
@Nullable abstract String geturi();
@Nullable abstract String getdatabase();
@Nullable abstract Coder<T> getCoder();
abstract Builder<T> toBuilder();
@AutoValue.Builder
abstract static class Builder<T> {
abstract Builder<T> setParser(Parser<T> parser);
abstract Builder<T> setUri(String uri);
abstract Builder<T> setDatabase(String database);
abstract Builder<T> setCoder(Coder<T> coder);
abstract Read<T> build();
}
public <X> Read<X> withParser(Parser<X> parser) {
checkNotNull(parser);
Builder<X> builder = (Builder<X>) toBuilder();
return builder.setParser(parser).setCoder(null).build();
}
/**
* Example documentation for withUri.
*/
public Read<T> withUri(String uri) {
checkNotNull(uri);
return toBuilder().setUri(uri).build();
}
public Read<T> withDatabase(String database) {
checkNotNull(database);
return toBuilder().setDatabase(database).build();
}
public Read<T> withCoder(Coder<T> coder) {
checkArgument(coder != null,
"InfluxDbIO.read().withCoder(coder) called with null coder");
return toBuilder().setCoder(coder).build();
}
@Override
public PCollection<T> expand(PBegin input) {
PCollection<String> inflxDb =
input.apply(org.apache.beam.sdk.io.Read.from(new
BoundedInfluxDbSource(this)));
System.out.println("***************Get InfluxDB ***********");
PCollection<String> output = inflxDb.apply("Print List",
ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("Print List: " + c.element());
c.output(c.element());
}
}));
PCollection<T> poutput = output.apply(ParDo.of(new
DoFn<String, T>() {
@ProcessElement
public void processElement(final ProcessContext c)
throws IOException {
System.out.println("***************processElement***********");
String entry = c.element();
getparser().parse(entry, new ParserCallback<T>() {
@Override
public void output(T output) {
c.output(output);
}
});
}
}));
System.out.println("***************Get InfluxDB Finished
***********");
return poutput;
}
@Override
public void validate(PBegin input) {
checkNotNull(geturi(), "uri");
checkNotNull(getdatabase(), "database");
checkState(getCoder() != null,
"InfluxDBIO.read() requires a coder to be set via
withCoder(coder)");
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
builder.add(DisplayData.item("uri", geturi()));
builder.add(DisplayData.item("database", getdatabase()));
builder.add(DisplayData.item("coder",
getCoder().getClass().getName()));
}
/** A {@link DoFn} executing the SQL query to read from the database. */
static class ReadFn<T> extends DoFn<String, T> {
private InfluxDbIO.Read<T> spec;
private ReadFn(Read<T> spec) {
this.spec = spec;
}
@ProcessElement
public void processElement(ProcessContext context) throws
Exception {
System.out.println("InfluxDB entry: " + context.element());
context.output((T) context.element()); // Rama: Need
to check the implementation
}
}
}
private static class BoundedInfluxDbSource extends BoundedSource<String> {
private Read spec;
public BoundedInfluxDbSource(Read spec) {
this.spec = spec;
}
@Override
public Coder<String> getDefaultOutputCoder() {
return SerializableCoder.of(String.class);
//StringUtf8Coder.of();
//SerializableCoder.of(String.class);
}
@Override
public void validate() {
spec.validate(null);
}
@Override
public void populateDisplayData(DisplayData.Builder builder) {
spec.populateDisplayData(builder);
}
@Override
public BoundedReader<String> createReader(PipelineOptions
options) throws IOException {
return new BoundedInfluxDbReader(this);
}
@Override
public long getEstimatedSizeBytes(PipelineOptions options)
throws Exception {
System.out.println("getEstimatedSizeBytes -->");
InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
influxDB.createDatabase(spec.getdatabase());
Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
QueryResult queryResult = influxDB.query(query);
List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();
int size = 0;
if(databaseNames != null) {
Iterator var4 = databaseNames.iterator();
while(var4.hasNext()) {
List database = (List)var4.next();
size += database.size();
}
}
return size;
}
@Override
public List<? extends BoundedSource<String>>
splitIntoBundles(long desiredBundleSizeBytes, PipelineOptions options)
throws Exception {
List<BoundedSource<String>> sources = new
ArrayList<BoundedSource<String>>();
InfluxDB influxDB = InfluxDBFactory.connect(spec.geturi());
influxDB.createDatabase(spec.getdatabase());
Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
QueryResult queryResult = influxDB.query(query);
List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();
int size = 0;
if(databaseNames != null) {
Iterator var4 = databaseNames.iterator();
while(var4.hasNext()) {
List database = (List)var4.next();
sources.add(this);// Need to check .. Rama
}
}
checkArgument(!sources.isEmpty(), "No primary shard found");
return sources;
}
}
private static class BoundedInfluxDbReader extends
BoundedSource.BoundedReader<String> {
private final BoundedInfluxDbSource source;
private Iterator cursor;
private List current;
private InfluxDB influxDB;
public BoundedInfluxDbReader(BoundedInfluxDbSource source) {
this.source = source;
}
@Override
public boolean start() throws IOException {
Read spec = source.spec;
System.out.println("*******Query InfluxDb********* " +
spec.getdatabase() + " URI: " + spec.geturi());
influxDB = InfluxDBFactory.connect(spec.geturi());
influxDB.createDatabase(spec.getdatabase());
Query query = new Query("SELECT * FROM " +
spec.getdatabase(), spec.getdatabase());
QueryResult queryResult = influxDB.query(query);
List databaseNames =
queryResult.getResults().get(0).getSeries().get(0).getValues();
if(databaseNames != null) {
cursor = databaseNames.iterator();
}
return advance();
}
@Override
public boolean advance() throws IOException {
if (cursor.hasNext()) {
current = (List) cursor.next();
return true;
} else {
return false;
}
}
@Override
public BoundedSource<String> getCurrentSource() {
return source;
}
@Override
public String getCurrent() throws NoSuchElementException {
return current.toString();
}
@Override
public void close() throws IOException {
return;
// try {
// if (cursor != null) {
// cursor.close();
// }
// } catch (Exception e) {
// LOG.warn("Error closing MongoDB cursor", e);
// }
// try {
// client.close();
// } catch (Exception e) {
// LOG.warn("Error closing MongoDB client", e);
// }
}
}
/** Write data to InfluxDB. */
public static Write write() {
return null; //new
AutoValue_InfluxDbIO_Write.Builder().setBatchSize(1024L).build();
}
/**
* A {@link PTransform} to write to a MongoDB database.
*/
@AutoValue
public abstract static class Write extends
PTransform<PCollection<String>, PDone> {
@Nullable abstract String uri();
@Nullable abstract String database();
abstract long batchSize();
abstract Builder toBuilder();
@AutoValue.Builder
abstract static class Builder {
abstract Builder setUri(String uri);
abstract Builder setDatabase(String database);
abstract Builder setBatchSize(long batchSize);
abstract Write build();
}
public Write withUri(String uri) {
return toBuilder().setUri(uri).build();
}
public Write withDatabase(String database) {
return toBuilder().setDatabase(database).build();
}
public Write withBatchSize(long batchSize) {
return toBuilder().setBatchSize(batchSize).build();
}
@Override
public PDone expand(PCollection<String> input) {
input.apply(ParDo.of(new WriteFn(this)));
return PDone.in(input.getPipeline());
}
@Override
public void validate(PCollection<String> input) {
checkNotNull(uri(), "uri");
checkNotNull(database(), "database");
checkNotNull(batchSize(), "batchSize");
}
private static class WriteFn extends DoFn<String, Void> {
private final Write spec;
private InfluxDB influxDB;
private List<String> batch;
public WriteFn(Write spec) {
this.spec = spec;
}
@Setup
public void createInfluxDb() throws Exception {
//client = new MongoClient(new MongoClientURI(spec.uri()));
System.out.println("*******createInfluxDb********* "
+ spec.database() + " URI: " + spec.uri());
if (influxDB == null) {
influxDB = InfluxDBFactory.connect(spec.uri());
influxDB.createDatabase(spec.database());
System.out.println("*******createDatabase: " +
spec.database());
}
}
@StartBundle
public void startBundle(Context ctx) throws Exception {
batch = new ArrayList<String>();
}
@ProcessElement
public void processElement(ProcessContext ctx) throws Exception {
System.out.println("*******processElement: " + ctx.element());
BatchPoints batchPoint;
batchPoint = BatchPoints
.database(spec.database())
.tag("sflow", ctx.element())
.retentionPolicy("autogen")
.consistency(InfluxDB.ConsistencyLevel.ALL)
.build();
Point point1 = Point.measurement(spec.database())
.time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
.addField("sflow", ctx.element())
.build();
batchPoint.point(point1);
influxDB.write(batchPoint);
// influxDB.write(1, ctx.element());
// // Need to copy the document because
mongoCollection.insertMany() will mutate it
// // before inserting (will assign an id).
// batch.add(new String(ctx.element()));
// if (batch.size() >= spec.batchSize()) {
// finishBundle(ctx);
// }
}
@FinishBundle
public void finishBundle(Context ctx) throws Exception {
// MongoDatabase mongoDatabase =
clinet.getDatabase(spec.database());
// MongoCollection<Document> mongoCollection =
mongoDatabase.getCollection(spec.collection());
//
// mongoCollection.insertMany(batch);
batch.clear();
}
@Teardown
public void closeInfluxClient() throws Exception {
//influxDB.deleteDatabase(spec.database());
//influxDB = null;
/*
Query query = new Query("SELECT * FROM " +
spec.database(),spec.database());
QueryResult queryResult = influxDB.query(query);
List<QueryResult.Result> allresults = queryResult.getResults();
System.out.println(queryResult.getResults().get(0)
.getSeries().get(0).getValues());
List temp = queryResult.getResults().get(0)
.getSeries().get(0).getValues();
if(temp != null) {
Iterator var4 = temp.iterator();
System.out.println("Start.....");
while(var4.hasNext()) {
List database = (List)var4.next();
System.out.println(database.toString());
}
}
//
// for(List<Object> ll : temp) {
// System.out.println(ll.get(0));
// System.out.println(ll.get(1));
// }
*/
System.out.println("*******deleteDatabase: " + spec.database());
}
}
}
}
On Fri, Jun 30, 2017 at 11:03 AM, Jean-Baptiste Onofré <[email protected]>
wrote:
> Hi,
>
> can you share your code ? I will fix that with you. I suggest to check the
> expand() method in the read PTransform and the way you use generic there.
>
> Any plan to donate this IO: I would be happy to review the PR !
>
> Do you leverage some InfluxDB feature (like splitting/sharding) ?
>
> Regards
> JB
>
>
> On 06/30/2017 07:26 AM, P. Ramanjaneya Reddy wrote:
>
>> Hi Beam Dev,
>>
>> We have developed our own sdk io functions for read/write InfluxDBIO
>> operations in apache BEAM. it is works with default coder, which is
>> StringUtf8Coder.of().
>>
>> PCollection<String> output = pipeline.apply(
>> InfluxDbIO.<String>read()
>> .withUri("http://localhost:8086")
>> .withDatabase("beam"));
>>
>>
>>
>>
>> With reference mongoDB and JDBC, implemented the read function with
>> setcoder() options in InfluxDB also, but it is not working.
>>
>> PCollection<String> output = pipeline.apply(
>> InfluxDbIO.<String>read()
>> .withParser(new InfluxDbIO.Parser<String>() {
>> @Override
>> public void parse(String input,
>> InfluxDbIO.ParserCallback<String>
>> callback) throws IOException {
>> callback.output(input);
>> }
>> })
>> .withUri("http://localhost:8086")
>> .withDatabase("beam")
>> .withCoder(StringUtf8Coder.of()));----> with coder
>> getting error as
>>
>> java.lang.ClassCastException: org.apache.beam.sdk.values.PBegin cannot be
>> cast to org.apache.beam.sdk.values.PCollection
>>
>> Thanks & Regards,
>> Ramanjaneya
>>
>>
> --
> Jean-Baptiste Onofré
> [email protected]
> http://blog.nanthrax.net
> Talend - http://www.talend.com
>