[
https://issues.apache.org/jira/browse/STORM-1139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989352#comment-14989352
]
hima commented on STORM-1139:
-----------------------------
hai
I wrote psql bolt and topology also submitting but in bolt there is an
error as:
java.lang.NullPointerException at Demo.PsqlBolt.execute(PsqlBolt.java:88) at
backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
at
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
at
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58)
at
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
at
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
at
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
at
backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745)
My spout is:
public static final List<Values> rows = Lists.newArrayList(
new Values(1,"peter",System.currentTimeMillis()),
new Values(2,"bob",System.currentTimeMillis()),
new Values(3,"alice",System.currentTimeMillis()));
public UserSpout() {
this(true);
}
public void nextTuple() {
final Random rand = new Random();
final Values row = rows.get(rand.nextInt(rows.size() - 1));
this.collector.emit(row);
Thread.yield();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("user_id","user_name","create_date"));
}
My bolt is:
public PsqlBolt withInsertQuery(String insertQuery) {
this.insertQuery = insertQuery;
return this;
}
public void execute(Tuple tuple) {
try {
List<Column> columns = jdbcMapper.getColumns(tuple);
List<List<Column>> columnLists = new ArrayList<List<Column>>();
columnLists.add(columns);
if(!StringUtils.isBlank(tableName)) {
this.jdbcClient.insert(this.tableName, columnLists);
} else {
this.jdbcClient.executeInsertQuery(this.insertQuery,
columnLists); //It is Showing issue here........
}
this.collector.ack(tuple);
} catch (Exception e) {
this.collector.reportError(e);
this.collector.fail(tuple);
}
}
My topology is:
ConnectionProvider cp = new MyConnectionProvider(map);
jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);
List<Column> schemaColumns = Lists.newArrayList(new
Column("user_id", Types.INTEGER), new Column("user_name",Types.VARCHAR),new
Column("create_date", Types.TIMESTAMP));
JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);
PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
.withInsertQuery("insert into user_details (user_id, user_name,
create_date) values (?,?,?)");
builder.setSpout("myspout", new UserSpout(), 1);
builder.setBolt("Psql_Bolt",
userPersistanceBolt,1).shuffleGrouping("myspout");
Can any one help me...
> Issues regarding storm-postgresql interface
> -------------------------------------------
>
> Key: STORM-1139
> URL: https://issues.apache.org/jira/browse/STORM-1139
> Project: Apache Storm
> Issue Type: Bug
> Reporter: hima
>
> hai
> I am trying to write storm bolt to insert data in postgesql DB.But i am
> facing issues like
> java.io.NotSerializableException:org.postgresql.jdbc4.Jdbc4Connection.
> Can anyone provide me full code for storm bolt that can insert data into
> postgres database.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)