[
https://issues.apache.org/jira/browse/STORM-1448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jayasree reassigned STORM-1448:
-------------------------------
Assignee: (was: Parth Brahmbhatt)
> How to return objects in declare output fields
> ----------------------------------------------
>
> Key: STORM-1448
> URL: https://issues.apache.org/jira/browse/STORM-1448
> Project: Apache Storm
> Issue Type: Bug
> Environment: Linux OS, Storm , Kafka
> Reporter: Jayasree
>
> I was working on twitter data using kafka-storm. I was using deserialize
> method to parse the twitter data using storm spout , here in parser I was
> facing a problem of returning all the objects. What I have done is I added
> them in a list and returned the list. Up to that it's working fine but, while
> coming to getOutputFields() it showing runtime error i.e.,
> ERROR : java.lang.IllegalArgumentException: Tuple created with wrong number
> of fields. Expected 6 fields but got 140 fields at
> backtype.storm.tuple.TupleImpl.<init>(TupleImpl.java:58) at
> backtype.storm.daemon.executor$fn_5624$fn5639$send_spout_msg5658.invoke(executor.clj:529)
> at backtype.storm.daemon.executor$fn5624$fn$reify5668.emit(executor.clj:568)
> at
> backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:49)
> at
> backtype.storm.spout.SpoutOutputCollector.emit(SpoutOutputCollector.java:63)
> at storm.kafka.PartitionManager.next(PartitionManager.java:141) at
> storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:141) at
> backtype.storm.daemon.executor$fn5624$fn5639$fn5670.invoke(executor.clj:607)
> at backtype.storm.util$async_loop$fn_545.invoke(util.clj:479) at
> clojure.lang.AFn.run(AFn.java:22) at java.lang.Thread.run(Thread.java:745)
> I was returning 6 objects but it showing that its getting 60 fields . How to
> solve that ? Can anyone suggest me the solution ?
> Below is the program which I am working on .
> Program :
> package Demo;
> import java.sql.Timestamp;
> import java.text.DateFormat;
> import java.text.SimpleDateFormat;
> import java.util.Date;
> import java.io.IOException;
> import java.util.Properties;
> import java.util.*;
> import twitter4j.JSONArray;
> import twitter4j.JSONObject;
> import twitter4j.JSONObjectType;
> import java.io.UnsupportedEncodingException;
> import backtype.storm.spout.Scheme;
> import backtype.storm.spout.RawScheme;
> import backtype.storm.spout.SpoutOutputCollector;
> import backtype.storm.task.TopologyContext;
> import backtype.storm.topology.OutputFieldsDeclarer;
> import backtype.storm.topology.base.BaseRichSpout;
> import backtype.storm.tuple.Fields;
> import backtype.storm.tuple.Values;
> public class TwitterTweet implements Scheme{
> //twitter_tweets fields
> String created_at;
> String id;
> String id_str;
> String text;
> String source;
> String truncated;
>
> int i=0;int j=0;
> @Override
> public List<Object> deserialize(final byte[] bytes) {
> List<Object> list = new ArrayList<Object>() {
> {
> try{
> String twitterEvent = new String(bytes, "UTF-8");
>
> JSONArray JSON = new JSONArray(twitterEvent); // kafka topic
> name(twitterEvent)
> for(i=0;i<JSON.length();i++)
> {
> JSONObject object_tweet=JSON.getJSONObject(i);
>
> //Tweet status
>
> try{
> add(created_at=object_tweet.getString("created_at"));
> add(id=object_tweet.getString("id"));
> add(id_str=object_tweet.getString("id_str"));
> add(text=object_tweet.getString("text"));
> add(source=object_tweet.getString("source"));
> add(truncated=object_tweet.getString("truncated"));
>
> }catch(Exception e){}
> }//JSON main_array for close
> }catch(Exception e){} // UTF- try close
> }
> };
> return list;
> } //deserialize method close
> public Fields getOutputFields() {
> return new Fields
> ("created_at","id","id_str","text","source","truncated");
> } // getOutputFields() method close
> } //class close
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)