Yeah, you got my 'theory'. My thought is that collect does not ensure K, V
have been 'safely collected'. Give it a go and see if it fixes your issue.
St.Ack
On Fri, Nov 28, 2008 at 10:15 PM, Nishant Khurana <[EMAIL PROTECTED]>wrote:
> Hi,
> Yeah, I have added output collect in for loop because I thought it would
> emit the full row again n again for each value of genre i.e. all the other
> columns being the same for a particular movie_id, only genre will change
> and
> emit different rows. At the reducer end, it will just parse the mapwritable
> and insert it using the row key. Now when it comes across multiple rows
> with
> same row key and different genre values, my assumption was that it would
> insert multiple values for genre for the same row key.
> But I what I mean from your comment is that the different genre values are
> being overwritten at the map stage itself and not even reaching reducer. Am
> I right?
> Yes, I am reusing mapwritable but doesn't output.collect ensures that the
> <k,v> has been collected safely and if I reuse mapwritable with some
> change,
> it will be collected again ? If not, do I have to instantiate new
> MapWritable again for it to do that or there is a better way ?
> Thanks for the comments.
>
> On Sat, Nov 29, 2008 at 12:44 AM, stack <[EMAIL PROTECTED]> wrote:
>
> > Your output.collect is inside the for loop. Is that your intent?
> >
> > Also, be careful resusing objects emitted on output.collect. You are
> > reusing mapwritable here. This means that you are adding genre values
> all
> > against the same genre:genre key; my guess is that only the last added
> > makes
> > it over to the reduce.
> >
> > St.Ack
> >
> > On Fri, Nov 28, 2008 at 3:59 PM, Nishant Khurana <[EMAIL PROTECTED]
> > >wrote:
> >
> > > Hi Stack,
> > > When I am trying to add multiple values to the same column, I couldn't
> > see
> > > those if I scan through the tables. I did what you suggested and here
> is
> > > the
> > > code I have written :
> > >
> > > public class UploadMoviesInfo extends Configured implements Tool
> > > {
> > > public static class MapClass extends MapReduceBase implements
> > > Mapper<LongWritable, Text, IntWritable, MapWritable>
> > > {
> > > public void map(LongWritable key, Text value,
> > > OutputCollector<IntWritable, MapWritable> output, Reporter reporter)
> > throws
> > > IOException
> > > {
> > > String line = value.toString();
> > > String[] result = line.split("%");
> > > MapWritable mw = new MapWritable();
> > > mw.put(new Text("name:name"), new Text(result[1].toString()));
> > > mw.put(new Text("rating_value:rating_value"), new
> > > Text(result[2].toString()));
> > > mw.put(new Text("country:country"), new
> > > Text(result[3].toString()));
> > > String[] genres = result[4].split(",");
> > > int b = new Integer(result[0]).intValue();
> > > IntWritable iw = new IntWritable(b);
> > > for(int i=0;i<genres.length;i++)
> > > {
> > > mw.put(new Text("genre:genre"), new Text(genres[i]));
> > > output.collect(iw, mw);
> > > }
> > > }
> > > }
> > >
> > > public static class ReduceClass extends TableReduce<IntWritable,
> > > MapWritable>
> > > {
> > > @Override
> > > public void reduce(IntWritable key, Iterator<MapWritable> values,
> > > OutputCollector<ImmutableBytesWritable, BatchUpdate> output, Reporter
> > > reporter) throws IOException
> > > {
> > > reporter.setStatus("Reducer committing " + key);
> > > ImmutableBytesWritable ibw = new
> > > ImmutableBytesWritable(Bytes.toBytes(key.get()));
> > > BatchUpdate outval = new BatchUpdate(Bytes.toBytes(key.get()));
> > > while (values.hasNext())
> > > {
> > > MapWritable hmw = new MapWritable(values.next());
> > > outval.put("rating_value:", Bytes.toBytes(hmw.get(new
> > > Text("rating_value:rating_value")).toString()));
> > > outval.put("name:", Bytes.toBytes(hmw.get(new
> > > Text("name:name")).toString()));
> > > outval.put("country:", Bytes.toBytes(hmw.get(new
> > > Text("country:country")).toString()));
> > > outval.put("genre:", Bytes.toBytes(hmw.get(new
> > > Text("genre:genre")).toString()));
> > > output.collect(ibw,outval);
> > > }
> > > }
> > > }
> > >
> > >
> > >
> > > The text file I am parsing looks like this :
> > > 1808512447%Never Die Alone%A%United States%Action/Adventure, Thriller,
> > > Crime/Gangster, Adaptation
> > > 1807776058%Lilo and Stitch%PG-13%United States%Comedy, Kids/Family,
> > Science
> > > Fiction/Fantasy, Animation
> > > 1808467879%Something's Gotta Give%PG-13%United States%Comedy, Romance
> > > 1809809725%Aqua Teen Hunger Force Colon Movie Film for
> Theaters%PG%United
> > > States%Comedy, Animation, Adaptation
> > > 1809423256%Lady Chatterley%PG-13%France%Art/Foreign, Drama, Adaptation
> > > 1808573131%The Blind Swordsman: Zatoichi%PG-13%Japan%Action/Adventure,
> > > Art/Foreign, Drama
> > > 1809374864%Ossessione%PG-13%Italy%Drama
> > > 1808746739%Love%Unrated%United States%Thriller
> > >
> > > So according to this my genre column should have 4 genres (comma
> > separated)
> > > for the first movie but I only find when I scan through the table.
> > >
> > > Please let me know if I am doing something wrong. Also about my query
> > > below,
> > > the IntWritables gets changed to those characters and then I am unable
> to
> > > use the Hbase shell to query data. Is there a workaround ?
> > >
> > > Thanks
> > >
> > >
> > > On Fri, Nov 28, 2008 at 3:50 PM, Nishant Khurana <
> [EMAIL PROTECTED]
> > > >wrote:
> > >
> > > > Thanks,
> > > > It worked :) . One more question. When I store Integer values as row
> > keys
> > > > or any column values and run scan table from hbase shell, they come
> > like
> > > > this :
> > > > \000\000C| column=year:, timestamp=1227905036961,
> > > > value=1999
> > > > \000\000C~ column=name:, timestamp=1227905036962,
> > > > value=The 39 Steps
> > > > \000\000C~ column=yahoo_movie_id:,
> > > > timestamp=1227905036962, value=k{I\357\277\275
> > > > \000\000C~ column=year:, timestamp=1227905036962,
> > > > value=1935
> > > > \000\000C\200 column=name:, timestamp=1227905036962,
> > > > value=Prophecy
> > > > \000\000C\200 column=yahoo_movie_id:,
> > > > timestamp=1227905036962, value=k\357\277\275\n@
> > > > \000\000C\200 column=year:, timestamp=1227905036962,
> > > > value=1979
> > > >
> > > > Notice the first column and value part both of which were integers.
> Is
> > it
> > > > because they get converted to ImmutableBytesWritable that they look
> > like
> > > > this ? Can I store them in readable form ?
> > > > Thanks
> > > >
> > > >
> > > >
> > > >
> > > > On Fri, Nov 28, 2008 at 3:08 PM, stack <[EMAIL PROTECTED]> wrote:
> > > >
> > > >> How is job being setup? I'd suspect you are calling
> > initTableReduceJob
> > > in
> > > >> job setup. Look at what it does. It sets the reduce key type.
> Maybe
> > > >> after
> > > >> calling it, reset the reduce key type to IntWritable.
> > > >> St.Ack
> > > >>
> > > >>
> > > >>
> > > >> On Fri, Nov 28, 2008 at 11:48 AM, Nishant Khurana <
> > > [EMAIL PROTECTED]
> > > >> >wrote:
> > > >>
> > > >> > Hi,
> > > >> > I am trying to run a map reduce job which parses a text file and
> > fills
> > > >> up a
> > > >> > Hbase Table. Following is the code :
> > > >> >
> > > >> >
> > > >> > public class UploadMoviesList extends Configured implements Tool
> > > >> > {
> > > >> > public static class MapClass extends MapReduceBase implements
> > > >> > Mapper<LongWritable, Text, IntWritable, MapWritable>
> > > >> > {
> > > >> > public void map(LongWritable key, Text value,
> > > >> > OutputCollector<IntWritable, MapWritable> output, Reporter
> reporter)
> > > >> throws
> > > >> > IOException
> > > >> > {
> > > >> > String line = value.toString();
> > > >> > String[] result = line.split("%");
> > > >> > MapWritable mw = new MapWritable();
> > > >> > mw.put(new Text("year:year"), new
> > Text(result[1].toString()));
> > > >> > mw.put(new Text("name:name"), new
> > Text(result[2].toString()));
> > > >> > int a = new Integer(result[3]).intValue();
> > > >> > mw.put(new Text("y_movie_id:y_movie_id"), new
> > IntWritable(a));
> > > >> > int b = new Integer(result[0]).intValue();
> > > >> > output.collect(new IntWritable(b), mw);
> > > >> > }
> > > >> > }
> > > >> >
> > > >> > public static class ReduceClass extends TableReduce<IntWritable,
> > > >> > MapWritable>
> > > >> > {
> > > >> > @Override
> > > >> > public void reduce(IntWritable key, Iterator<MapWritable>
> values,
> > > >> > OutputCollector<ImmutableBytesWritable, BatchUpdate> output,
> > Reporter
> > > >> > reporter) throws IOException
> > > >> > {
> > > >> > reporter.setStatus("Reducer committing " + key);
> > > >> > ImmutableBytesWritable ibw = new
> > > >> > ImmutableBytesWritable(Bytes.toBytes(key.get()));
> > > >> > BatchUpdate outval = new
> > BatchUpdate(Bytes.toBytes(key.get()));
> > > >> > while (values.hasNext())
> > > >> > {
> > > >> > MapWritable hmw = new MapWritable(values.next());
> > > >> > outval.put("year:year",
> > > >> > Bytes.toBytes(hmw.get("year:year").toString()));
> > > >> > outval.put("name:name",
> > > >> > Bytes.toBytes(hmw.get("name:name").toString()));
> > > >> > IntWritable iw =
> > > >> (IntWritable)(hmw.get("y_movie_id:y_movie_id"));
> > > >> > outval.put("y_movie_id:y_movie_id",
> > Bytes.toBytes(iw.get()));
> > > >> > output.collect(ibw,outval);
> > > >> > }
> > > >> > }
> > > >> > }
> > > >> >
> > > >> >
> > > >> > When I try to run it, I am getting following exceptions :
> > > >> > 08/11/28 14:42:27 INFO mapred.JobClient: Task Id :
> > > >> > attempt_200811281158_0005_m_000001_0, Status : FAILED
> > > >> > java.io.IOException: Type mismatch in key from map: expected
> > > >> > org.apache.hadoop.hbase.io.ImmutableBytesWritable, recieved
> > > >> > org.apache.hadoop.io.IntWritable
> > > >> > at
> > > >> >
> > > >>
> > >
> >
> org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:415)
> > > >> > at
> > > >> dist_q_data.UploadMoviesList$MapClass.map(UploadMoviesList.java:45)
> > > >> > at
> > > dist_q_data.UploadMoviesList$MapClass.map(UploadMoviesList.java:1)
> > > >> > at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:47)
> > > >> > at org.apache.hadoop.mapred.MapTask.run(MapTask.java:227)
> > > >> > at
> > > >> >
> > org.apache.hadoop.mapred.TaskTracker$Child.main(TaskTracker.java:2207)
> > > >> >
> > > >> >
> > > >> > I don't know why it says it expects a ImmutableBytesWritable key.
> > Any
> > > >> > suggestions ?
> > > >> > Thanks
> > > >> >
> > > >> > --
> > > >> > Nishant Khurana
> > > >> > Candidate for Masters in Engineering (Dec 2009)
> > > >> > Computer and Information Science
> > > >> > School of Engineering and Applied Science
> > > >> > University of Pennsylvania
> > > >> >
> > > >>
> > > >
> > > >
> > > >
> > > > --
> > > > Nishant Khurana
> > > > Candidate for Masters in Engineering (Dec 2009)
> > > > Computer and Information Science
> > > > School of Engineering and Applied Science
> > > > University of Pennsylvania
> > > >
> > >
> > >
> > >
> > > --
> > > Nishant Khurana
> > > Candidate for Masters in Engineering (Dec 2009)
> > > Computer and Information Science
> > > School of Engineering and Applied Science
> > > University of Pennsylvania
> > >
> >
>
>
>
> --
> Nishant Khurana
> Candidate for Masters in Engineering (Dec 2009)
> Computer and Information Science
> School of Engineering and Applied Science
> University of Pennsylvania
>