Hi,
I'm trying to write a M/R job to do the following:
- Scan a given table and collect unique column names
- Write those column names to another table with the source table name
as the row key, and columns in the format (Cols:<sourceColName>,
<sourceColName)
I'm not sure what I'm doing wrong. It appears that the mapper is
configured and then immediately closed w/o scanning any rows. I've
verified there is data in my source table. I don't see the reduce
task run at all. Attached is the code and below is the log output
from running the task. Thanks in advance for any pointers.
2009-02-10 11:14:55,087 INFO (main) JvmMetrics - Initializing JVM
Metrics with processName=JobTracker, sessionId=
2009-02-10 11:14:55,151 WARN (main) JobClient - No job jar file set.
User classes may not be found. See JobConf(Class) or
JobConf#setJar(String).
2009-02-10 11:14:55,610 INFO (main) TableInputFormatBase - split:
0->localhost:,
2009-02-10 11:14:55,840 INFO (main) JobClient - Running job: job_local_0001
2009-02-10 11:14:55,890 INFO (Thread-11) TableInputFormatBase -
split: 0->localhost:,
2009-02-10 11:14:55,931 INFO (Thread-11) MapTask - numReduceTasks: 1
2009-02-10 11:14:55,957 INFO (Thread-11) MapTask - io.sort.mb = 100
2009-02-10 11:14:56,239 INFO (Thread-11) MapTask - data buffer =
79691776/99614720
2009-02-10 11:14:56,239 INFO (Thread-11) MapTask - record buffer =
262144/327680
2009-02-10 11:14:56,863 INFO (main) JobClient - map 0% reduce 0%
2009-02-10 11:14:59,558 INFO (Thread-11)
MetadataMapReduceJob$MetadatMapper - Mapper for table
DayAheadHourlyLMP configured
2009-02-10 11:14:59,561 INFO (Thread-11)
MetadataMapReduceJob$MetadatMapper - Mapper for table
DayAheadHourlyLMP closed
2009-02-10 11:14:59,561 INFO (Thread-11) MapTask - Starting flush of
map output
2009-02-10 11:14:59,612 INFO (Thread-11) MapTask - Index: (0, 2, 6)
2009-02-10 11:14:59,613 INFO (Thread-11) TaskRunner -
Task:attempt_local_0001_m_000000_0 is done. And is in the process of
commiting
2009-02-10 11:14:59,616 INFO (Thread-11) LocalJobRunner -
2009-02-10 11:14:59,616 INFO (Thread-11) TaskRunner - Task
'attempt_local_0001_m_000000_0' done.
2009-02-10 11:14:59,648 INFO (Thread-11)
MetadataMapReduceJob$MetadataReducer - Metadata Reducer is configured
2009-02-10 11:14:59,663 INFO (Thread-11) Merger - Merging 1 sorted segments
2009-02-10 11:14:59,666 INFO (Thread-11) Merger - Down to the last
merge-pass, with 0 segments left of total size: 0 bytes
2009-02-10 11:14:59,708 INFO (Thread-11) TaskRunner -
Task:attempt_local_0001_r_000000_0 is done. And is in the process of
commiting
2009-02-10 11:14:59,709 INFO (Thread-11) LocalJobRunner - reduce > reduce
2009-02-10 11:14:59,710 INFO (Thread-11) TaskRunner - Task
'attempt_local_0001_r_000000_0' done.
2009-02-10 11:14:59,866 INFO (main) JobClient - Job complete: job_local_0001
2009-02-10 11:14:59,867 INFO (main) JobClient - Counters: 11
2009-02-10 11:14:59,867 INFO (main) JobClient - File Systems
2009-02-10 11:14:59,868 INFO (main) JobClient - Local bytes read=38422
2009-02-10 11:14:59,868 INFO (main) JobClient - Local bytes written=77300
2009-02-10 11:14:59,868 INFO (main) JobClient - Map-Reduce Framework
2009-02-10 11:14:59,868 INFO (main) JobClient - Reduce input groups=0
2009-02-10 11:14:59,868 INFO (main) JobClient - Combine output records=0
2009-02-10 11:14:59,868 INFO (main) JobClient - Map input records=0
2009-02-10 11:14:59,868 INFO (main) JobClient - Reduce output records=0
2009-02-10 11:14:59,869 INFO (main) JobClient - Map output bytes=0
2009-02-10 11:14:59,869 INFO (main) JobClient - Map input bytes=0
2009-02-10 11:14:59,869 INFO (main) JobClient - Combine input records=0
2009-02-10 11:14:59,869 INFO (main) JobClient - Map output records=0
2009-02-10 11:14:59,869 INFO (main) JobClient - Reduce input records=0
package com.enernoc.rnd.shredder.metadata;
import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MetadataMapReduceJob extends Configured implements Tool {
public static class MetadatMapper extends MapReduceBase
implements TableMap<ImmutableBytesWritable, RowResult> {
byte[] inputTableName;
ImmutableBytesWritable tableKey;
String tableName;
protected Logger log = LoggerFactory.getLogger( getClass() );
@Override
public void configure( JobConf conf ) {
Path[] tableNames = FileInputFormat.getInputPaths(conf);
if ( tableNames.length < 1 )
throw new RuntimeException("Expected table name as a FileInputFormat path");
tableName = tableNames[0].getName();
inputTableName = tableName.getBytes();
tableKey = new ImmutableBytesWritable( inputTableName );
log.info( "Mapper for table {} configured", tableName );
}
@Override
public void map(ImmutableBytesWritable key, RowResult row,
OutputCollector<ImmutableBytesWritable, RowResult> collector, Reporter reporter )
throws IOException {
log.trace( "Mapping to table {} row {}", tableName, new String(row.getRow()) );
collector.collect( tableKey, row );
}
@Override
public void close() throws IOException {
log.info( "Mapper for table {} closed", tableName );
}
}
public static class MetadataReducer extends MapReduceBase
implements TableReduce<ImmutableBytesWritable, RowResult> {
byte[] colFam = Bytes.toBytes("Cols:");
Set<ImmutableBytesWritable> colCache = new HashSet<ImmutableBytesWritable>();
protected Logger log = LoggerFactory.getLogger( getClass() );
@Override
public void reduce( ImmutableBytesWritable tableKey,
Iterator<RowResult> rowIter,
OutputCollector<ImmutableBytesWritable, BatchUpdate> collector,
Reporter reporter ) throws IOException {
BatchUpdate update = new BatchUpdate();
while ( rowIter.hasNext() ) {
RowResult row = rowIter.next();
for ( byte[] colName : row.keySet() ) {
ImmutableBytesWritable col = new ImmutableBytesWritable(colName);
if ( colCache.contains(col) ) continue;
update.put( Bytes.add( colFam, colName ),
colName );
if (log.isTraceEnabled() ) log.trace( "Found col {} for table {}",
new String(colName),
tableKey.toString() );
}
}
collector.collect( tableKey, update );
}
@Override
public void configure( JobConf conf ) {
log.info( "Metadata Reducer is configured" );
}
@Override
public void close() throws IOException {}
}
@Override
public int run( String[] args ) throws Exception {
JobConf c = new JobConf( getConf(), getClass() );
c.setJobName( getClass().getName() );
String inputTableName = args[0];
if ( inputTableName == null )
throw new RuntimeException( "Missing table name argument" );
FileInputFormat.setInputPaths(c, new Path(inputTableName) );
c.set( TableOutputFormat.OUTPUT_TABLE, "_TableData" );
c.set( TableInputFormat.COLUMN_LIST, "LMP:*" );
c.setInputFormat( TableInputFormat.class );
c.setMapperClass( MetadatMapper.class );
c.setOutputFormat( TableOutputFormat.class );
c.setReducerClass( MetadataReducer.class );
c.setNumReduceTasks(1);
JobClient.runJob(c);
return 0;
}
public static void main( String... args ) throws Exception {
ToolRunner.run( new HBaseConfiguration(), new MetadataMapReduceJob(), args );
}
}