Author: stack Date: Sat Jan 12 13:32:46 2008 New Revision: 611488 URL: http://svn.apache.org/viewvc?rev=611488&view=rev Log: HADOOP-2548 Make TableMap and TableReduce generic
Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original) +++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Jan 12 13:32:46 2008 @@ -164,6 +164,8 @@ HADOOP-2450 Show version (and svn revision) in hbase web ui HADOOP-2472 Range selection using filter (Edward Yoon via Stack) HADOOP-2553 Don't make Long objects calculating hbase type hash codes + HADOOP-2548 Make TableMap and TableReduce generic + (Frederik Hedberg via Stack) Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java Sat Jan 12 13:32:46 2008 @@ -27,18 +27,18 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; - import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Extract grouping columns from input record */ -public class GroupingTableMap extends TableMap { +public class GroupingTableMap extends TableMap<Text,MapWritable> { /** * JobConf parameter to specify the columns used to produce the key passed to @@ -49,11 +49,6 @@ protected Text[] m_columns; - /** default constructor */ - public GroupingTableMap() { - super(); - } - /** * Use this before submitting a TableMap job. It will appropriately set up the * JobConf. @@ -65,6 +60,7 @@ * @param mapper map class * @param job job configuration object */ + @SuppressWarnings("unchecked") public static void initJob(String table, String columns, String groupColumns, Class<? extends TableMap> mapper, JobConf job) { @@ -89,11 +85,11 @@ * Pass the new key and value to reduce. * If any of the grouping columns are not found in the value, the record is skipped. * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) */ @Override public void map(@SuppressWarnings("unused") HStoreKey key, - MapWritable value, TableOutputCollector output, + MapWritable value, OutputCollector<Text,MapWritable> output, @SuppressWarnings("unused") Reporter reporter) throws IOException { byte[][] keyVals = extractKeyValues(value); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java Sat Jan 12 13:32:46 2008 @@ -24,13 +24,14 @@ import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Pass the given key and record as-is to reduce */ -public class IdentityTableMap extends TableMap { +public class IdentityTableMap extends TableMap<Text, MapWritable> { /** constructor */ public IdentityTableMap() { @@ -40,11 +41,11 @@ /** * Pass the key, value to reduce * - * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) */ @Override public void map(HStoreKey key, MapWritable value, - TableOutputCollector output, + OutputCollector<Text,MapWritable> output, @SuppressWarnings("unused") Reporter reporter) throws IOException { Text tKey = key.getRow(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java Sat Jan 12 13:32:46 2008 @@ -24,28 +24,23 @@ import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; /** * Write to table each key, record pair */ -public class IdentityTableReduce extends TableReduce { - - /** constructor */ - public IdentityTableReduce() { - super(); - } - +public class IdentityTableReduce extends TableReduce<Text, MapWritable> { /** * No aggregation, output pairs of (key, record) * - * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, org.apache.hadoop.mapred.Reporter) + * @see org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) */ @Override - public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values, - TableOutputCollector output, - @SuppressWarnings("unused") Reporter reporter) throws IOException { + public void reduce(Text key, Iterator<MapWritable> values, + OutputCollector<Text, MapWritable> output, Reporter reporter) + throws IOException { while(values.hasNext()) { MapWritable r = (MapWritable)values.next(); Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java Sat Jan 12 13:32:46 2008 @@ -21,6 +21,8 @@ import java.io.IOException; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HStoreKey; import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -30,8 +32,6 @@ import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HStoreKey; /** * Scan an HBase table to sort by a specified sort column. @@ -39,14 +39,8 @@ * */ @SuppressWarnings("unchecked") -public abstract class TableMap extends MapReduceBase implements Mapper { - private TableOutputCollector m_collector; - - /** constructor*/ - public TableMap() { - m_collector = new TableOutputCollector(); - } - +public abstract class TableMap<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Mapper<HStoreKey, MapWritable, K, V> { /** * Use this before submitting a TableMap job. It will * appropriately set up the JobConf. @@ -56,9 +50,8 @@ * @param mapper mapper class * @param job job configuration */ - public static void initJob(String table, String columns, + public static void initJob(String table, String columns, Class<? extends TableMap> mapper, JobConf job) { - job.setInputFormat(TableInputFormat.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MapWritable.class); @@ -68,27 +61,6 @@ } /** - * Input: - * @param key is of type HStoreKey - * @param value is of type KeyedDataArrayWritable - * @param output output collector - * @param reporter object to use for status updates - * @throws IOException - * - * Output: - * The key is a specific column, including the input key or any value - * The value is of type LabeledData - */ - public void map(WritableComparable key, Writable value, - OutputCollector output, Reporter reporter) throws IOException { - - if(m_collector.collector == null) { - m_collector.collector = output; - } - map((HStoreKey)key, (MapWritable)value, m_collector, reporter); - } - - /** * Call a user defined function on a single HBase record, represented * by a key and its associated record value. * @@ -98,6 +70,6 @@ * @param reporter * @throws IOException */ - public abstract void map(HStoreKey key, MapWritable value, - TableOutputCollector output, Reporter reporter) throws IOException; + public abstract void map(HStoreKey key, MapWritable value, + OutputCollector<K, V> output, Reporter reporter) throws IOException; } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java Sat Jan 12 13:32:46 2008 @@ -1,47 +0,0 @@ -/** - * Copyright 2007 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.mapred; - -import java.io.IOException; - -import org.apache.hadoop.io.MapWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.mapred.OutputCollector; - -/** - * Refine the types that can be collected from a Table Map/Reduce jobs. - */ -public class TableOutputCollector { - /** The collector object */ - @SuppressWarnings("unchecked") - public OutputCollector collector; - - /** - * Restrict Table Map/Reduce's output to be a Text key and a record. - * - * @param key - * @param value - * @throws IOException - */ - @SuppressWarnings("unchecked") - public void collect(Text key, MapWritable value) throws IOException { - collector.collect(key, value); - } -} Modified: lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java Sat Jan 12 13:32:46 2008 @@ -22,8 +22,10 @@ import java.io.IOException; import java.util.Iterator; -import org.apache.hadoop.io.WritableComparable; +import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; @@ -34,14 +36,8 @@ * Write a table, sorting by the input key */ @SuppressWarnings("unchecked") -public abstract class TableReduce extends MapReduceBase implements Reducer { - TableOutputCollector m_collector; - - /** Constructor */ - public TableReduce() { - m_collector = new TableOutputCollector(); - } - +public abstract class TableReduce<K extends WritableComparable, V extends Writable> + extends MapReduceBase implements Reducer<K, V, Text, MapWritable> { /** * Use this before submitting a TableReduce job. It will * appropriately set up the JobConf. @@ -50,31 +46,14 @@ * @param reducer * @param job */ - public static void initJob(String table, Class<? extends TableReduce> reducer, - JobConf job) { - + public static void initJob(String table, + Class<? extends TableReduce> reducer, JobConf job) { job.setOutputFormat(TableOutputFormat.class); job.setReducerClass(reducer); job.set(TableOutputFormat.OUTPUT_TABLE, table); } /** - * Create a unique key for table insertion by appending a local - * counter the given key. - * - * @see org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable, java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, org.apache.hadoop.mapred.Reporter) - */ - @SuppressWarnings("unchecked") - public void reduce(WritableComparable key, Iterator values, - OutputCollector output, Reporter reporter) throws IOException { - - if(m_collector.collector == null) { - m_collector.collector = output; - } - reduce((Text)key, values, m_collector, reporter); - } - - /** * * @param key * @param values @@ -82,8 +61,7 @@ * @param reporter * @throws IOException */ - @SuppressWarnings("unchecked") - public abstract void reduce(Text key, Iterator values, - TableOutputCollector output, Reporter reporter) throws IOException; - + public abstract void reduce(K key, Iterator<V> values, + OutputCollector<Text, MapWritable> output, Reporter reporter) + throws IOException; } Modified: lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=611488&r1=611487&r2=611488&view=diff ============================================================================== --- lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java (original) +++ lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java Sat Jan 12 13:32:46 2008 @@ -46,6 +46,7 @@ import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MiniMRCluster; import org.apache.hadoop.mapred.Reporter; +import org.apache.hadoop.mapred.OutputCollector; /** * Test Map/Reduce job over HBase tables @@ -147,13 +148,7 @@ /** * Pass the given key and processed record reduce */ - public static class ProcessContentsMapper extends TableMap { - - /** constructor */ - public ProcessContentsMapper() { - super(); - } - + public static class ProcessContentsMapper extends TableMap<Text, MapWritable> { /** * Pass the key, and reversed value to reduce * @@ -162,7 +157,7 @@ @SuppressWarnings("unchecked") @Override public void map(HStoreKey key, MapWritable value, - TableOutputCollector output, + OutputCollector<Text, MapWritable> output, @SuppressWarnings("unused") Reporter reporter) throws IOException { Text tKey = key.getRow();