Author: stack Date: Thu Nov 29 16:01:25 2007 New Revision: 599643 URL: http://svn.apache.org/viewvc?rev=599643&view=rev Log: HADOOP-2234 TableInputFormat erroneously aggregates map values
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=599643&r1=599642&r2=599643&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Thu Nov 29 16:01:25 2007 @@ -40,6 +40,7 @@ HADOOP-2253 getRow can return HBASE::DELETEVAL cells (Bryan Duxbury via Stack) HADOOP-2295 Fix assigning a region to multiple servers + HADOOP-2234 TableInputFormat erroneously aggregates map values IMPROVEMENTS HADOOP-2401 Add convenience put method that takes writable Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java?rev=599643&r1=599642&r2=599643&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableInputFormat.java Thu Nov 29 16:01:25 2007 @@ -48,8 +48,7 @@ * Convert HBase tabular data into a format that is consumable by Map/Reduce */ public class TableInputFormat -implements InputFormat<HStoreKey, MapWritable>, JobConfigurable { - +implements InputFormat<HStoreKey, MapWritable>, JobConfigurable { static final Logger LOG = Logger.getLogger(TableInputFormat.class.getName()); /** @@ -67,9 +66,9 @@ * return (HStoreKey, MapWritable<Text, ImmutableBytesWritable>) pairs */ class TableRecordReader implements RecordReader<HStoreKey, MapWritable> { - private HScannerInterface m_scanner; - private SortedMap<Text, byte[]> m_row; // current buffer - private Text m_endRow; + private final HScannerInterface m_scanner; + // current buffer + private final SortedMap<Text, byte[]> m_row = new TreeMap<Text, byte[]>(); /** * Constructor @@ -78,14 +77,15 @@ * @throws IOException */ public TableRecordReader(Text startRow, Text endRow) throws IOException { - m_row = new TreeMap<Text, byte[]>(); - m_scanner = m_table.obtainScanner(m_cols, startRow); - m_endRow = endRow; + if (endRow != null && endRow.getLength() > 0) { + this.m_scanner = m_table.obtainScanner(m_cols, startRow, endRow); + } else { + this.m_scanner = m_table.obtainScanner(m_cols, startRow); + } } - /** [EMAIL PROTECTED] */ public void close() throws IOException { - m_scanner.close(); + this.m_scanner.close(); } /** @@ -132,20 +132,14 @@ */ @SuppressWarnings("unchecked") public boolean next(HStoreKey key, MapWritable value) throws IOException { - m_row.clear(); + this.m_row.clear(); HStoreKey tKey = key; - boolean hasMore = m_scanner.next(tKey, m_row); - - if(hasMore) { - if(m_endRow.getLength() > 0 && - (tKey.getRow().compareTo(m_endRow) >= 0)) { - - hasMore = false; - - } else { - for(Map.Entry<Text, byte[]> e: m_row.entrySet()) { - value.put(e.getKey(), new ImmutableBytesWritable(e.getValue())); - } + boolean hasMore = this.m_scanner.next(tKey, this.m_row); + if (hasMore) { + // Clear value to remove content added by previous call to next. + value.clear(); + for (Map.Entry<Text, byte[]> e: this.m_row.entrySet()) { + value.put(e.getKey(), new ImmutableBytesWritable(e.getValue())); } } return hasMore; @@ -153,12 +147,11 @@ } - /** [EMAIL PROTECTED] */ public RecordReader<HStoreKey, MapWritable> getRecordReader( InputSplit split, @SuppressWarnings("unused") JobConf job, - @SuppressWarnings("unused") Reporter reporter) throws IOException { - + @SuppressWarnings("unused") Reporter reporter) + throws IOException { TableSplit tSplit = (TableSplit)split; return new TableRecordReader(tSplit.getStartRow(), tSplit.getEndRow()); } @@ -185,7 +178,6 @@ return splits; } - /** [EMAIL PROTECTED] */ public void configure(JobConf job) { Path[] tableNames = job.getInputPaths(); m_tableName = new Text(tableNames[0].getName()); @@ -202,21 +194,17 @@ } } - /** [EMAIL PROTECTED] */ public void validateInput(JobConf job) throws IOException { - // expecting exactly one path - Path[] tableNames = job.getInputPaths(); if(tableNames == null || tableNames.length > 1) { throw new IOException("expecting one table name"); } // expecting at least one column - String colArg = job.get(COLUMN_LIST); if(colArg == null || colArg.length() == 0) { throw new IOException("expecting at least one column"); } } -} +} \ No newline at end of file