[ 
https://issues.apache.org/jira/browse/STORM-1139?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14989352#comment-14989352
 ] 

hima commented on STORM-1139:
-----------------------------

hai
     I wrote psql bolt and topology also submitting but in bolt there is an 
error as:
java.lang.NullPointerException at Demo.PsqlBolt.execute(PsqlBolt.java:88) at 
backtype.storm.daemon.executor$fn__6647$tuple_action_fn__6649.invoke(executor.clj:633)
 at 
backtype.storm.daemon.executor$mk_task_receiver$fn__6570.invoke(executor.clj:401)
 at 
backtype.storm.disruptor$clojure_handler$reify__1605.onEvent(disruptor.clj:58) 
at 
backtype.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:125)
 at 
backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:99)
 at 
backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80) 
at 
backtype.storm.daemon.executor$fn__6647$fn__6659$fn__6706.invoke(executor.clj:748)
 at backtype.storm.util$async_loop$fn__459.invoke(util.clj:463) at 
clojure.lang.AFn.run(AFn.java:24) at java.lang.Thread.run(Thread.java:745) 

My spout is:

public static final List<Values> rows = Lists.newArrayList(
            new Values(1,"peter",System.currentTimeMillis()),
            new Values(2,"bob",System.currentTimeMillis()),
            new Values(3,"alice",System.currentTimeMillis()));

    public UserSpout() {
        this(true);
    }

public void nextTuple() {
        final Random rand = new Random();
        final Values row = rows.get(rand.nextInt(rows.size() - 1));
        this.collector.emit(row);
        Thread.yield();
    }
public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("user_id","user_name","create_date"));
    }

My bolt is:
public PsqlBolt withInsertQuery(String insertQuery) {
        this.insertQuery = insertQuery;
        return this;
    }
public void execute(Tuple tuple) {
        try {

            List<Column> columns = jdbcMapper.getColumns(tuple);
            List<List<Column>> columnLists = new ArrayList<List<Column>>();
            columnLists.add(columns);
            if(!StringUtils.isBlank(tableName)) {
                this.jdbcClient.insert(this.tableName, columnLists);
            } else {
                this.jdbcClient.executeInsertQuery(this.insertQuery, 
columnLists);   //It is Showing issue here........
            }
            this.collector.ack(tuple);
        } catch (Exception e) {
            this.collector.reportError(e);
            this.collector.fail(tuple);
        }
    }

My topology is:
ConnectionProvider cp = new MyConnectionProvider(map);

                jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);

                List<Column> schemaColumns = Lists.newArrayList(new 
Column("user_id", Types.INTEGER), new Column("user_name",Types.VARCHAR),new 
Column("create_date", Types.TIMESTAMP));
        
                JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

                PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
                .withInsertQuery("insert into user_details (user_id, user_name, 
create_date) values (?,?,?)");

                builder.setSpout("myspout", new UserSpout(), 1);
                
                builder.setBolt("Psql_Bolt", 
userPersistanceBolt,1).shuffleGrouping("myspout");


Can any one help me...


> Issues regarding storm-postgresql interface
> -------------------------------------------
>
>                 Key: STORM-1139
>                 URL: https://issues.apache.org/jira/browse/STORM-1139
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: hima
>
> hai 
>       I am trying to write storm bolt to insert data in postgesql DB.But i am 
> facing issues like 
> java.io.NotSerializableException:org.postgresql.jdbc4.Jdbc4Connection.
> Can anyone provide me full code for storm bolt that can insert data into 
> postgres database.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to