[ 
https://issues.apache.org/jira/browse/STORM-1512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15125841#comment-15125841
 ] 

hima commented on STORM-1512:
-----------------------------

Here is my sample code:

Topology:

public class Topology {

ConnectionProvider cp;
protected static final String JDBC_CONF = "jdbc.conf";
protected static final String TABLE_NAME = "users";

public static void main(String[] args) throws Exception{
    String argument = args[0];
    JdbcMapper jdbcMapper;
   TopologyBuilder builder = new TopologyBuilder();
Map map = Maps.newHashMap();
        map.put("dataSourceClassName", "org.postgresql.ds.PGSimpleDataSource");
    
map.put("dataSource.url","jdbc:postgresql://localhost:5432/analysis?user=postgres");
ConnectionProvider cp = new MyConnectionProvider(map);

    jdbcMapper = new SimpleJdbcMapper(TABLE_NAME, cp);

    List<Column> schemaColumns = Lists.newArrayList(new Column("user_id", 
Types.INTEGER), new Column             ("user_name",Types.VARCHAR),new 
Column("create_date", Types.TIMESTAMP));

    JdbcMapper mapper = new SimpleJdbcMapper(schemaColumns);

        PsqlBolt userPersistanceBolt = new PsqlBolt(cp, mapper)
    .withInsertQuery("insert into user_details (id, user_name, 
created_timestamp) values (?,?,?)");

    builder.setSpout("myspout", new UserSpout(), 1);

    builder.setBolt("Psql_Bolt", 
userPersistanceBolt,1).shuffleGrouping("myspout");

    jdbcMapper = new SimpleJdbcMapper("My_details", cp);

    List<Column> schemaColumns1 = Lists.newArrayList(new Column("my_id", 
Types.INTEGER), new Column             ("my_name",Types.VARCHAR));

    JdbcMapper mapper1 = new SimpleJdbcMapper(schemaColumns1);

        PsqlBolt1 userPersistanceBolt1 = new PsqlBolt1(cp, mapper1)
    .withInsertQuery("insert into My_details (my_id, my_name) values (?,?)");

    //builder.setSpout("myspout", new UserSpout(), 1);

    builder.setBolt("Psql_Bolt1", 
userPersistanceBolt1,1).shuffleGrouping("myspout");
 Config conf = new Config();
     conf.put(JDBC_CONF, map);
     conf.setDebug(true);
         conf.setNumWorkers(3);


    if (argument.equalsIgnoreCase("runLocally")){
        System.out.println("Running topology locally...");
        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("Twitter Test Storm-postgresql", conf, 
builder.createTopology());
    }
    else {
        System.out.println("Running topology on cluster...");
        StormSubmitter.submitTopology("Topology_psql", conf, 
builder.createTopology()); 
    }
}}


PsqlBolt:

public class PsqlBolt extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt.class);
 private String tableName;
 private String insertQuery;
 private JdbcMapper jdbcMapper;

public PsqlBolt(ConnectionProvider connectionProvider,  JdbcMapper jdbcMapper) {
    super(connectionProvider);
    this.jdbcMapper = jdbcMapper;
}
public PsqlBolt withTableName(String tableName) {
    this.tableName = tableName;
    return this;
}

public PsqlBolt withInsertQuery(String insertQuery) {
    this.insertQuery = insertQuery;
System.out.println("query passsed.....");
    return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector 
collector) {
    super.prepare(map, topologyContext, collector);
    if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
        throw new IllegalArgumentException("You must supply either a tableName 
or an insert Query.");
    }
}

@Override
public void execute(Tuple tuple) {
    try {

        List<Column> columns = jdbcMapper.getColumns(tuple);
        List<List<Column>> columnLists = new ArrayList<List<Column>>();
        columnLists.add(columns);
        if(!StringUtils.isBlank(tableName)) {
            this.jdbcClient.insert(this.tableName, columnLists);
        } else {
            this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
        }
        this.collector.ack(tuple);
    } catch (Exception e) {
        this.collector.reportError(e);
        this.collector.fail(tuple);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}


PsqlBolt1:

public class PsqlBolt1 extends AbstractJdbcBolt {
private static final Logger LOG = Logger.getLogger(PsqlBolt1.class);

 private String tableName;
 private String insertQuery;
 private JdbcMapper jdbcMapper;

public PsqlBolt1(ConnectionProvider connectionProvider,  JdbcMapper jdbcMapper) 
{
    super(connectionProvider);
    this.jdbcMapper = jdbcMapper;
}
public PsqlBolt1 withInsertQuery(String insertQuery) {
    this.insertQuery = insertQuery;
System.out.println("query passsed.....");
    return this;
}
@Override
public void prepare(Map map, TopologyContext topologyContext, OutputCollector 
collector) {
    super.prepare(map, topologyContext, collector);
    if(StringUtils.isBlank(tableName) && StringUtils.isBlank(insertQuery)) {
        throw new IllegalArgumentException("You must supply either a tableName 
or an insert Query.");
    }
}

@Override
public void execute(Tuple tuple) {
    try {
  Thread.sleep(1000);
        List<Column> columns = jdbcMapper.getColumns(tuple);
        List<List<Column>> columnLists = new ArrayList<List<Column>>();
        columnLists.add(columns);
        if(!StringUtils.isBlank(tableName)) {
            this.jdbcClient.insert(this.tableName, columnLists);
        } else {
            this.jdbcClient.executeInsertQuery(this.insertQuery, columnLists);
        }
        this.collector.ack(tuple);
    } catch (Exception e) {
        this.collector.reportError(e);
        this.collector.fail(tuple);
    }
}

@Override
public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {

}}

> how to execute one bolt after another when the input is taken from same spout.
> ------------------------------------------------------------------------------
>
>                 Key: STORM-1512
>                 URL: https://issues.apache.org/jira/browse/STORM-1512
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: hima
>
> My requirement is to first execute one bolt and upon successful execution 
> only next bolt have to execute and i am giving the input for the bolts from 
> same spout.For that for second bolt i am using Thread.sleep() method, its 
> working fine but have performance issues.Can anyone help me if there is any 
> alternative for this problem.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to