A few comments Naama:

Code-wise, all looks fine to me. Looks like you are writing against hbase 0.1. As is, your map does effectively nothing. You might have been able to do without it altogether and just use the Identity map.

FYI, the utility http://hadoop.apache.org/hbase/docs/current/org/apache/hadoop/hbase/util/Writables.html#getWritable(byte[],%20org.apache.hadoop.io.Writable) might have save you a line.

At a higher level, I'd suggest a refactoring. Do all of your work in the map phase. Have no reduce phase. I suggest this because as is, all rows emitted by the map are being sorted by the MR framework. But hbase will also do a sort on insert. Avoid paying the prices of the MR sort. Do your calculation in the map and then insert the result at map time. Either emit nothing or, emit a '1' for every row processed so the MR counters tell a story about your MR job.

St.Ack


Naama Kraus wrote:
Oh, didn't know that. I also couldn't find a way to edit the Wiki, I think I
am not permitted.
Well, here is the code that does the MapReduce, I'd be glad for comments.

- Naama

/**
 * A map reduce job over [EMAIL PROTECTED] GradesTable}
 * The job produces for each student (row) the average grade of his course
grades.
 * It puts the average in a separate column in the original (source) table
 *
 */
public class GradesTableMapReduce  extends Configured implements Tool {

  /**
   * Map a row to a {key, value} pairs.
   * Emit a {student, grade} pair for each course grade appearing in the
student row.
   * E.g. sara {62, 45, 87} -> {sara, 62}, {sara, 45}, {sara, 87}
   *
   */
  public static class GradesTableMap extends TableMap<Text, IntWritable> {

    @Override
    public void map(HStoreKey key, MapWritable value,
        OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {

      // Raw id is student name
      Text student = key.getRow();
      // Walk through the columns
      for (Map.Entry<Writable, Writable> e: value.entrySet()) {
//        Text course = (Text) e.getKey();
        byte [] gradeInBytes = ((ImmutableBytesWritable)
e.getValue()).get();
        DataInputStream in = new DataInputStream(new
ByteArrayInputStream(gradeInBytes));
        IntWritable grade = new IntWritable();
        grade.readFields(in);
//        System.out.println("map() -- student: " +
//            student.toString() +
//            ", course: " +
//            course.toString() +
//            ", grade: " +
//            grade.toString());
        // Emit student name and a grade
        output.collect(student, grade);
      }
    }
  }

  /**
   * Reduce - compute an average of key's values which is actually the
average grade of each student.
   * E.g. {sara, {62, 45, 87}} -> {sara, 65.6}
   *
   */
  public static class GradesTableReduce extends TableReduce<Text,
IntWritable> {

    @Override
    // key is student name, values are his grades
    public void reduce(Text key, Iterator<IntWritable> values,
        OutputCollector<Text, MapWritable> output, Reporter reporter)
    throws IOException {
      // Compute grades average
      int total = 0;
      int sum = 0;
      while (values.hasNext()) {
        total++;
        sum += values.next().get();
      }
      float average = sum / total;

      // We put the average as a separate column in the source table
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      DataOutputStream out = new DataOutputStream(baos);
      FloatWritable avgWritable = new FloatWritable(average);
      avgWritable.write(out);
      MapWritable map = new MapWritable();
      map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
              new ImmutableBytesWritable(baos.toByteArray()));
      output.collect(key, map);
    }
  }

  /**
   * Run
   */
  public int run(String[] args) throws Exception {
    JobConf jobConf = new JobConf();
    jobConf.setJobName("compute average grades");
    jobConf.setNumReduceTasks(1);

    // All columns in the course family (i.e. all grades) get into the map
    TableMap.initJob(GradesTable.TABLE_NAME, GradesTable.COURSE_FAMILY,
        GradesTableMap.class, jobConf);

    TableReduce.initJob(GradesTable.TABLE_NAME,
        GradesTableReduce.class, jobConf);

    // Map produces a value which is an IntWritable
    jobConf.setMapOutputValueClass(IntWritable.class);

    JobClient.runJob(jobConf);
    return 0;
  }

  public static void main(String [] args) throws Exception {
    ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
  }
}


==================================================

Here is the code that creates the original table (I used hbase 0.1.1)


/**
 * An HBase table of the form -
 * raw id is a student name
 * column name is Course:course_name
 * cell value is the student's grade in the course 'course_name'
 *
 * There is also another column, Stats:Average. This one gets filled by a
MapReduce job.
 * The cell contains the average grade of the student in all courses.
 *
 * Exmaple:
 *
 *         Course:Math  |  Course:Art  |  Course:Sports  | Stats:Average
 *
-----------------------------------------------------------------------------------------
 * Dan        87                97
99                      94.3
 * Dana      100              100                   80
93.3
 *
 * @see GradesTableMapReduce
 *
 *
 */
public class GradesTable {

  public static final String TABLE_NAME = "grades";
  public static final String COURSE_FAMILY = "Course:";
  // A column family holding grades statistics
  public static final String STATS_FAMILY = "Stats:";
  // A column member holding average grade (per student)
  public static final String AVG = "Average";

  private static final String [] STUDENT_NAMES = {
    "Dan", "Dana", "Sara", "David"
  };

  private static final String [] COURSE_NAMES = {
    "Math", "Art", "Sports"
  };

  private HBaseConfiguration conf;
  private HBaseAdmin admin;
  private HTableDescriptor desc;
  // Randomly generate a grade
  private Random rand;

  public GradesTable() throws IOException {
    conf = new HBaseConfiguration();
    admin = new HBaseAdmin(conf);
    desc = new HTableDescriptor(TABLE_NAME);
    rand = new Random();
  }

  /**
   * Create table and populate with content
   */
  public void create() throws IOException {
    desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
    desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
    admin.createTable(desc);
    System.out.println("Grades Table created");

    HTable table = new HTable(conf, new Text(TABLE_NAME));

    // Start an update transaction, student name is row id
    for (int i = 0; i < STUDENT_NAMES.length; i++) {
      System.out.println("<<< Row " + i + ", student: " + STUDENT_NAMES[i] +
" >>>");
      Text stuName = new Text(STUDENT_NAMES[i]);
      long writeid = table.startUpdate(stuName);
      for (int j = 0; j < COURSE_NAMES.length; j++) {
        Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
        // Put a cell with a grade of the student in this course
        int grade = Math.abs(rand.nextInt()) % 101;
        table.put(writeid, courseColumn, new IntWritable(grade));
        System.out.println("Course: " + COURSE_NAMES[j] + ", grade: " +
grade);
      }
      table.commit(writeid);
    }
    System.out.println("Table created");
  }

  }



On Sun, Jun 22, 2008 at 5:39 PM, Daniel Blaisdell <[EMAIL PROTECTED]>
wrote:

More than likely, the mailing list does not allow attachments.

On Sun, Jun 22, 2008 at 8:49 AM, Naama Kraus <[EMAIL PROTECTED]> wrote:

Trying to send over files again ...

Naama

--
oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00
oo
00 oo 00 oo
"If you want your children to be intelligent, read them fairy tales. If
you
want them to be more intelligent, read them more fairy tales." (Albert
Einstein)





Reply via email to