Hi,
Here is a version following the last comments.
Naama
/**
* <pre>
* 'Toy tables' for experimenting with MapReduce over HBase
*
* grades table - a HBase table of the form -
* raw id is a student name
* column name is Course:course_name
* cell value is the student's grade in the course 'course_name'
*
* Exmaple:
*
* Course:Math | Course:Art | Course:Sports
* ----------------------------------------------
* Dan 87 97 99
* Dana 100 100 80
*
* =======================================
*
* courses table - a HBase table of the form -
* raw id is a course name
* column name is Stats:Average
* cell value is the average grade in that course, computed by a map reduce
job
*
* Exmaple:
*
* Stats:Average
* --------------
* Art 86
* Match 77
* </pre>
* @see GradesTableMapReduce
*
*
*/
public class GradesTable {
public static final String GRADES_TABLE_NAME = "grades";
public static final String COURSE_TABLE_NAME = "courses";
public static final String COURSE_FAMILY = "Course:";
// A column family holding grades statistics
public static final String STATS_FAMILY = "Stats:";
// A column member holding average grade in course
public static final String AVG = "Average";
private static final String [] STUDENT_NAMES = {
"Dan", "Dana", "Sara", "David"
};
private static final String [] COURSE_NAMES = {
"Math", "Art", "Sports"
};
private HBaseConfiguration conf;
private HBaseAdmin admin;
private HTableDescriptor grades_desc;
private HTableDescriptor courses_desc;
// Randomly generate a grade
private Random rand;
private static final Log LOG =
LogFactory.getLog(GradesTable.class.getName());
public GradesTable() throws IOException {
conf = new HBaseConfiguration();
admin = new HBaseAdmin(conf);
grades_desc = new HTableDescriptor(GRADES_TABLE_NAME);
courses_desc = new HTableDescriptor(COURSE_TABLE_NAME);
rand = new Random();
}
/**
* Create tables and populate with content
*/
public void create() throws IOException {
grades_desc.addFamily(new HColumnDescriptor(COURSE_FAMILY));
courses_desc.addFamily(new HColumnDescriptor(STATS_FAMILY));
admin.createTable(grades_desc);
admin.createTable(courses_desc);
LOG.info("Tables created");
// Populate grades table with students and their grades in courses
HTable table = new HTable(conf, new Text(GRADES_TABLE_NAME));
// Start an update transaction, student name is row id
for (int i = 0; i < STUDENT_NAMES.length; i++) {
LOG.info("<<< Row " + i + ", student: " + STUDENT_NAMES[i] + " >>>");
Text stuName = new Text(STUDENT_NAMES[i]);
long writeid = table.startUpdate(stuName);
for (int j = 0; j < COURSE_NAMES.length; j++) {
Text courseColumn = new Text(COURSE_FAMILY + COURSE_NAMES[j]);
// Put a cell with a student's grade in this course
int grade = Math.abs(rand.nextInt()) % 101;
table.put(writeid, courseColumn, new IntWritable(grade));
LOG.info("Course: " + COURSE_NAMES[j] + ", grade: " + grade);
}
table.commit(writeid);
}
LOG.info("Grades Table populated");
}
public static void main(String [] args) {
try {
GradesTable gradesTable = new GradesTable();
gradesTable.create();
} catch (IOException e) {
LOG.fatal("An exception occured", e);
}
}
=========================================================
/**
* A map reduce job over [EMAIL PROTECTED] GradesTable}
* The job produces for each course the average grade in that course.
* It puts the average in a separate table which holds course statistics.
*
*/
public class GradesTableMapReduce extends Configured implements Tool {
/**
* Map a row to {key, value} pairs.
* Emit a {course, grade} pair for each course grade appearing in the
student row.
* E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
{Sports, 87}
*
*/
public static class GradesTableMap extends TableMap<Text, IntWritable> {
@Override
public void map(HStoreKey key, MapWritable value,
OutputCollector<Text, IntWritable> output, Reporter reporter) throws
IOException {
// Walk through the columns
for (Map.Entry<Writable, Writable> e: value.entrySet()) {
// Column name is course name
Text course = (Text) e.getKey();
// Remove the family prefix
String courseStr = HStoreKey.extractQualifier(course).toString();
course = new Text(courseStr);
byte [] gradeInBytes = ((ImmutableBytesWritable)
e.getValue()).get();
IntWritable grade = new IntWritable();
Writables.getWritable(gradeInBytes, grade);
// Emit course name and a grade
output.collect(course, grade);
}
}
}
/**
* Reduce - compute an average of key's values which is actually the
average grade in each course.
* E.g. {Math, {62, 45, 87}} -> {Math, 65.6}
*
*/
public static class GradesTableReduce extends TableReduce<Text,
IntWritable> {
@Override
// key is course name, values are grades in the course
public void reduce(Text key, Iterator<IntWritable> values,
OutputCollector<Text, MapWritable> output, Reporter reporter)
throws IOException {
// Compute grades average
int total = 0;
int sum = 0;
while (values.hasNext()) {
total++;
sum += values.next().get();
}
float average = sum / total;
// We put the average as a separate column in the courses table
ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos);
FloatWritable avgWritable = new FloatWritable(average);
avgWritable.write(out);
MapWritable map = new MapWritable();
map.put(new Text(GradesTable.STATS_FAMILY + GradesTable.AVG),
new ImmutableBytesWritable(baos.toByteArray()));
output.collect(key, map);
}
}
/**
* Run
*/
public int run(String[] args) throws Exception {
JobConf jobConf = new JobConf();
jobConf.setJobName("compute average grades");
jobConf.setNumReduceTasks(1);
// All columns in the course family (i.e. all grades) get into the map
TableMap.initJob(GradesTable.GRADES_TABLE_NAME,
GradesTable.COURSE_FAMILY,
GradesTableMap.class, jobConf);
// Reduce output (course average grade) is put in the courses table
TableReduce.initJob(GradesTable.COURSE_TABLE_NAME,
GradesTableReduce.class, jobConf);
// Map produces a value which is an IntWritable
jobConf.setMapOutputValueClass(IntWritable.class);
JobClient.runJob(jobConf);
return 0;
}
public static void main(String [] args) throws Exception {
ToolRunner.run(new Configuration(), new GradesTableMapReduce(), args);
}
}
On Thu, Jul 3, 2008 at 9:44 AM, Naama Kraus <[EMAIL PROTECTED]> wrote:
> Thanks St.Ack for the further comments and for putting a link in the Wiki.
> Naama
>
>
> On Tue, Jul 1, 2008 at 8:38 PM, stack <[EMAIL PROTECTED]> wrote:
>
>> Comments in-line below:
>>
>> Naama Kraus wrote:
>>
>>> Here is an updated code
>>>
>>> Naama
>>>
>>> /**
>>> * <pre>
>>> * 'Toy tables' for experiencing with MapReduce over HBase
>>>
>>>
>> Do you mean 'experimenting' in the above?
>>
>> ....
>>
>>> public void create() throws IOException {
>>>
>>>
>>
>> Where does this method get called? I don't see it.
>>
>>
>> System.out.println("Grades Table populated");
>>>
>>>
>>
>> Do you want to set up a logger to do the outputting instead? See the head
>> of (most) hbase classes for example. Look for 'LOG'.
>>
>>
>> }
>>> }
>>>
>>>
>>> ====================================================
>>>
>>> /**
>>> * A map reduce job over [EMAIL PROTECTED] GradesTable}
>>> * The job produces for each course the average grade in that course.
>>> * It puts the average in a separate table which holds course statistics.
>>> *
>>> */
>>> public class GradesTableMapReduce extends Configured implements Tool {
>>>
>>> /**
>>> * Map a row to {key, value} pairs.
>>> * Emit a {course, grade} pair for each course grade appearing in the
>>> student row.
>>> * E.g. Sara {Math:62, Art:45, Sports:87} -> {Math, 62}, {Art, 45},
>>> {Sports, 87}
>>> *
>>> */
>>> public static class GradesTableMap extends TableMap<Text, IntWritable> {
>>>
>>> @Override
>>> public void map(HStoreKey key, MapWritable value,
>>> OutputCollector<Text, IntWritable> output, Reporter reporter)
>>> throws
>>> IOException {
>>>
>>> // Walk through the columns
>>> for (Map.Entry<Writable, Writable> e: value.entrySet()) {
>>> // Column name is course name
>>> Text course = (Text) e.getKey();
>>> // Remove the family prefix
>>> String courseStr = course.toString();
>>> courseStr =
>>> courseStr.substring(courseStr.indexOf(':') + 1);
>>>
>>>
>>
>> There may be utility in HStoreKey to do the above stripping of the column
>> family (getQualifier?).
>>
>> course = new Text(courseStr);
>>> byte [] gradeInBytes = ((ImmutableBytesWritable)
>>> e.getValue()).get();
>>> DataInputStream in = new DataInputStream(new
>>> ByteArrayInputStream(gradeInBytes));
>>> IntWritable grade = new IntWritable();
>>> grade.readFields(in);
>>>
>>>
>>
>> You could have used Writables.getWritable above and saved yourself a few
>> lines (Not important).
>>
>> Otherwise, this class is an excellent example of using MR + HBase. I've
>> add a pointer to it up on the wiki under the MR+HBase page (update the link
>> if you update your code).
>>
>> Thanks,
>> St.Ack
>>
>
>
>
> --
> oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
> 00 oo 00 oo
> "If you want your children to be intelligent, read them fairy tales. If you
> want them to be more intelligent, read them more fairy tales." (Albert
> Einstein)
>
--
oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo 00 oo
00 oo 00 oo
"If you want your children to be intelligent, read them fairy tales. If you
want them to be more intelligent, read them more fairy tales." (Albert
Einstein)