Hi,

I'm trying to write a M/R job to do the following:

- Scan a given table and collect unique column names
- Write those column names to another table with the source table name
as the row key, and columns in the format (Cols:<sourceColName>,
<sourceColName)

I'm not sure what I'm doing wrong.  It appears that the mapper is
configured and then immediately closed w/o scanning any rows.  I've
verified there is data in my source table.  I don't see the reduce
task run at all.  Attached is the code and below is the log output
from running the task.  Thanks in advance for any pointers.

2009-02-10 11:14:55,087 INFO  (main)  JvmMetrics - Initializing JVM
Metrics with processName=JobTracker, sessionId=
2009-02-10 11:14:55,151 WARN  (main)  JobClient - No job jar file set.
 User classes may not be found. See JobConf(Class) or
JobConf#setJar(String).
2009-02-10 11:14:55,610 INFO  (main)  TableInputFormatBase - split:
0->localhost:,
2009-02-10 11:14:55,840 INFO  (main)  JobClient - Running job: job_local_0001
2009-02-10 11:14:55,890 INFO  (Thread-11)  TableInputFormatBase -
split: 0->localhost:,
2009-02-10 11:14:55,931 INFO  (Thread-11)  MapTask - numReduceTasks: 1
2009-02-10 11:14:55,957 INFO  (Thread-11)  MapTask - io.sort.mb = 100
2009-02-10 11:14:56,239 INFO  (Thread-11)  MapTask - data buffer =
79691776/99614720
2009-02-10 11:14:56,239 INFO  (Thread-11)  MapTask - record buffer =
262144/327680
2009-02-10 11:14:56,863 INFO  (main)  JobClient -  map 0% reduce 0%
2009-02-10 11:14:59,558 INFO  (Thread-11)
MetadataMapReduceJob$MetadatMapper - Mapper for table
DayAheadHourlyLMP configured
2009-02-10 11:14:59,561 INFO  (Thread-11)
MetadataMapReduceJob$MetadatMapper - Mapper for table
DayAheadHourlyLMP closed
2009-02-10 11:14:59,561 INFO  (Thread-11)  MapTask - Starting flush of
map output
2009-02-10 11:14:59,612 INFO  (Thread-11)  MapTask - Index: (0, 2, 6)
2009-02-10 11:14:59,613 INFO  (Thread-11)  TaskRunner -
Task:attempt_local_0001_m_000000_0 is done. And is in the process of
commiting
2009-02-10 11:14:59,616 INFO  (Thread-11)  LocalJobRunner -
2009-02-10 11:14:59,616 INFO  (Thread-11)  TaskRunner - Task
'attempt_local_0001_m_000000_0' done.
2009-02-10 11:14:59,648 INFO  (Thread-11)
MetadataMapReduceJob$MetadataReducer - Metadata Reducer is configured
2009-02-10 11:14:59,663 INFO  (Thread-11)  Merger - Merging 1 sorted segments
2009-02-10 11:14:59,666 INFO  (Thread-11)  Merger - Down to the last
merge-pass, with 0 segments left of total size: 0 bytes
2009-02-10 11:14:59,708 INFO  (Thread-11)  TaskRunner -
Task:attempt_local_0001_r_000000_0 is done. And is in the process of
commiting
2009-02-10 11:14:59,709 INFO  (Thread-11)  LocalJobRunner - reduce > reduce
2009-02-10 11:14:59,710 INFO  (Thread-11)  TaskRunner - Task
'attempt_local_0001_r_000000_0' done.
2009-02-10 11:14:59,866 INFO  (main)  JobClient - Job complete: job_local_0001
2009-02-10 11:14:59,867 INFO  (main)  JobClient - Counters: 11
2009-02-10 11:14:59,867 INFO  (main)  JobClient -   File Systems
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Local bytes read=38422
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Local bytes written=77300
2009-02-10 11:14:59,868 INFO  (main)  JobClient -   Map-Reduce Framework
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Reduce input groups=0
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Combine output records=0
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Map input records=0
2009-02-10 11:14:59,868 INFO  (main)  JobClient -     Reduce output records=0
2009-02-10 11:14:59,869 INFO  (main)  JobClient -     Map output bytes=0
2009-02-10 11:14:59,869 INFO  (main)  JobClient -     Map input bytes=0
2009-02-10 11:14:59,869 INFO  (main)  JobClient -     Combine input records=0
2009-02-10 11:14:59,869 INFO  (main)  JobClient -     Map output records=0
2009-02-10 11:14:59,869 INFO  (main)  JobClient -     Reduce input records=0
package com.enernoc.rnd.shredder.metadata;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.mapred.TableInputFormat;
import org.apache.hadoop.hbase.mapred.TableMap;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapred.TableReduce;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MetadataMapReduceJob extends Configured implements Tool {
	

	public static class MetadatMapper extends MapReduceBase
			implements TableMap<ImmutableBytesWritable, RowResult> {
		byte[] inputTableName;
		ImmutableBytesWritable tableKey;
		String tableName;
		protected Logger log = LoggerFactory.getLogger( getClass() );
		
		@Override
		public void configure( JobConf conf ) {
			Path[] tableNames = FileInputFormat.getInputPaths(conf);
			if ( tableNames.length < 1 )
				throw new RuntimeException("Expected table name as a FileInputFormat path");
			tableName = tableNames[0].getName();
			inputTableName = tableName.getBytes();
			tableKey = new ImmutableBytesWritable( inputTableName );
			log.info( "Mapper for table {} configured", tableName );
		}

		@Override
		public void map(ImmutableBytesWritable key, RowResult row,
				OutputCollector<ImmutableBytesWritable, RowResult> collector, Reporter reporter )
				throws IOException {
			log.trace( "Mapping to table {} row {}", tableName, new String(row.getRow()) );
			collector.collect( tableKey, row );
		}
		
		@Override
		public void close() throws IOException {
			log.info( "Mapper for table {} closed", tableName );
		}
	}
	
	
	public static class MetadataReducer extends MapReduceBase 
			implements TableReduce<ImmutableBytesWritable, RowResult> {

		byte[] colFam = Bytes.toBytes("Cols:");
		Set<ImmutableBytesWritable> colCache = new HashSet<ImmutableBytesWritable>(); 
		protected Logger log = LoggerFactory.getLogger( getClass() );
		
		@Override
		public void reduce( ImmutableBytesWritable tableKey,
				Iterator<RowResult> rowIter,
				OutputCollector<ImmutableBytesWritable, BatchUpdate> collector,
				Reporter reporter ) throws IOException {

			BatchUpdate update = new BatchUpdate();
			while ( rowIter.hasNext() ) {
				RowResult row = rowIter.next();
				for ( byte[] colName : row.keySet() ) {
					ImmutableBytesWritable col = new ImmutableBytesWritable(colName);
					if ( colCache.contains(col) ) continue;
					
					update.put( Bytes.add( colFam, colName ), 
							 colName );
					if (log.isTraceEnabled() ) log.trace( "Found col {} for table {}", 
							new String(colName), 
							tableKey.toString() );
				}
			}
			collector.collect( tableKey, update );
		}

		@Override
		public void configure( JobConf conf ) {
			log.info( "Metadata Reducer is configured" );
		}

		@Override
		public void close() throws IOException {}
	}
	
	@Override
	public int run( String[] args ) throws Exception {
		JobConf c = new JobConf( getConf(), getClass() );
	    c.setJobName( getClass().getName() );
	    
	    String inputTableName = args[0];
	    if ( inputTableName == null )
	    	throw new RuntimeException( "Missing table name argument" );
	    
	    FileInputFormat.setInputPaths(c, new Path(inputTableName) );
	    c.set( TableOutputFormat.OUTPUT_TABLE, "_TableData" );
	    c.set( TableInputFormat.COLUMN_LIST, "LMP:*" );

	    c.setInputFormat( TableInputFormat.class );
	    c.setMapperClass( MetadatMapper.class );
	    c.setOutputFormat( TableOutputFormat.class );
	    c.setReducerClass( MetadataReducer.class );
	    c.setNumReduceTasks(1);
		
	    JobClient.runJob(c);
		return 0;
	}
	
	public static void main( String... args ) throws Exception {
		ToolRunner.run( new HBaseConfiguration(), new MetadataMapReduceJob(), args );
	}
}

Reply via email to