Author: stack
Date: Sat Jan 12 13:32:46 2008
New Revision: 611488

URL: http://svn.apache.org/viewvc?rev=611488&view=rev
Log:
HADOOP-2548 Make TableMap and TableReduce generic

Modified:
    lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
    
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java

Modified: lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt (original)
+++ lucene/hadoop/trunk/src/contrib/hbase/CHANGES.txt Sat Jan 12 13:32:46 2008
@@ -164,6 +164,8 @@
    HADOOP-2450 Show version (and svn revision) in hbase web ui
    HADOOP-2472 Range selection using filter (Edward Yoon via Stack)
    HADOOP-2553 Don't make Long objects calculating hbase type hash codes
+   HADOOP-2548 Make TableMap and TableReduce generic
+               (Frederik Hedberg via Stack)
                
 
 

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/GroupingTableMap.java
 Sat Jan 12 13:32:46 2008
@@ -27,18 +27,18 @@
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Extract grouping columns from input record
  */
-public class GroupingTableMap extends TableMap {
+public class GroupingTableMap extends TableMap<Text,MapWritable> {
 
   /**
    * JobConf parameter to specify the columns used to produce the key passed 
to 
@@ -49,11 +49,6 @@
   
   protected Text[] m_columns;
 
-  /** default constructor */
-  public GroupingTableMap() {
-    super();
-  }
-
   /**
    * Use this before submitting a TableMap job. It will appropriately set up 
the
    * JobConf.
@@ -65,6 +60,7 @@
    * @param mapper map class
    * @param job job configuration object
    */
+  @SuppressWarnings("unchecked")
   public static void initJob(String table, String columns, String 
groupColumns, 
       Class<? extends TableMap> mapper, JobConf job) {
     
@@ -89,11 +85,11 @@
    * Pass the new key and value to reduce.
    * If any of the grouping columns are not found in the value, the record is 
skipped.
    *
-   * @see 
org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, 
org.apache.hadoop.io.MapWritable, 
org.apache.hadoop.hbase.mapred.TableOutputCollector, 
org.apache.hadoop.mapred.Reporter)
+   * @see 
org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, 
org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, 
org.apache.hadoop.mapred.Reporter)
    */
   @Override
   public void map(@SuppressWarnings("unused") HStoreKey key,
-      MapWritable value, TableOutputCollector output,
+      MapWritable value, OutputCollector<Text,MapWritable> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     byte[][] keyVals = extractKeyValues(value);

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableMap.java
 Sat Jan 12 13:32:46 2008
@@ -24,13 +24,14 @@
 import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Pass the given key and record as-is to reduce
  */
-public class IdentityTableMap extends TableMap {
+public class IdentityTableMap extends TableMap<Text, MapWritable> {
 
   /** constructor */
   public IdentityTableMap() {
@@ -40,11 +41,11 @@
   /**
    * Pass the key, value to reduce
    *
-   * @see 
org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, 
org.apache.hadoop.io.MapWritable, 
org.apache.hadoop.hbase.mapred.TableOutputCollector, 
org.apache.hadoop.mapred.Reporter)
+   * @see 
org.apache.hadoop.hbase.mapred.TableMap#map(org.apache.hadoop.hbase.HStoreKey, 
org.apache.hadoop.io.MapWritable, org.apache.hadoop.mapred.OutputCollector, 
org.apache.hadoop.mapred.Reporter)
    */
   @Override
   public void map(HStoreKey key, MapWritable value,
-      TableOutputCollector output,
+      OutputCollector<Text,MapWritable> output,
       @SuppressWarnings("unused") Reporter reporter) throws IOException {
     
     Text tKey = key.getRow();

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/IdentityTableReduce.java
 Sat Jan 12 13:32:46 2008
@@ -24,28 +24,23 @@
 
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
 
 
 /**
  * Write to table each key, record pair
  */
-public class IdentityTableReduce extends TableReduce {
-
-  /** constructor */
-  public IdentityTableReduce() {
-    super();
-  }
-
+public class IdentityTableReduce extends TableReduce<Text, MapWritable> {
   /**
    * No aggregation, output pairs of (key, record)
    *
-   * @see 
org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.Text, 
java.util.Iterator, org.apache.hadoop.hbase.mapred.TableOutputCollector, 
org.apache.hadoop.mapred.Reporter)
+   * @see 
org.apache.hadoop.hbase.mapred.TableReduce#reduce(org.apache.hadoop.io.WritableComparable,
 java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, 
org.apache.hadoop.mapred.Reporter)
    */
   @Override
-  public void reduce(Text key, @SuppressWarnings("unchecked") Iterator values,
-      TableOutputCollector output,
-      @SuppressWarnings("unused") Reporter reporter) throws IOException {
+  public void reduce(Text key, Iterator<MapWritable> values,
+      OutputCollector<Text, MapWritable> output, Reporter reporter)
+      throws IOException {
     
     while(values.hasNext()) {
       MapWritable r = (MapWritable)values.next();

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableMap.java
 Sat Jan 12 13:32:46 2008
@@ -21,6 +21,8 @@
 
 import java.io.IOException;
 
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HStoreKey;
 import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
@@ -30,8 +32,6 @@
 import org.apache.hadoop.mapred.Mapper;
 import org.apache.hadoop.mapred.OutputCollector;
 import org.apache.hadoop.mapred.Reporter;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HStoreKey;
 
 /**
  * Scan an HBase table to sort by a specified sort column.
@@ -39,14 +39,8 @@
  *
  */
 @SuppressWarnings("unchecked")
-public abstract class TableMap extends MapReduceBase implements Mapper {
-  private TableOutputCollector m_collector;
-
-  /** constructor*/
-  public TableMap() {
-    m_collector = new TableOutputCollector();
-  }
-
+public abstract class TableMap<K extends WritableComparable, V extends 
Writable>
+    extends MapReduceBase implements Mapper<HStoreKey, MapWritable, K, V> {
   /**
    * Use this before submitting a TableMap job. It will
    * appropriately set up the JobConf.
@@ -56,9 +50,8 @@
    * @param mapper mapper class
    * @param job job configuration
    */
-  public static void initJob(String table, String columns, 
+  public static void initJob(String table, String columns,
       Class<? extends TableMap> mapper, JobConf job) {
-    
     job.setInputFormat(TableInputFormat.class);
     job.setOutputKeyClass(Text.class);
     job.setOutputValueClass(MapWritable.class);
@@ -68,27 +61,6 @@
   }
 
   /**
-   * Input:
-   * @param key is of type HStoreKey
-   * @param value is of type KeyedDataArrayWritable
-   * @param output output collector
-   * @param reporter object to use for status updates
-   * @throws IOException
-   * 
-   * Output:
-   * The key is a specific column, including the input key or any value
-   * The value is of type LabeledData
-   */
-  public void map(WritableComparable key, Writable value,
-      OutputCollector output, Reporter reporter) throws IOException {
-    
-    if(m_collector.collector == null) {
-      m_collector.collector = output;
-    }
-    map((HStoreKey)key, (MapWritable)value, m_collector, reporter);
-  }
-
-  /**
    * Call a user defined function on a single HBase record, represented
    * by a key and its associated record value.
    * 
@@ -98,6 +70,6 @@
    * @param reporter
    * @throws IOException
    */
-  public abstract void map(HStoreKey key, MapWritable value, 
-      TableOutputCollector output, Reporter reporter) throws IOException;
+  public abstract void map(HStoreKey key, MapWritable value,
+      OutputCollector<K, V> output, Reporter reporter) throws IOException;
 }

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableOutputCollector.java
 Sat Jan 12 13:32:46 2008
@@ -1,47 +0,0 @@
-/**
- * Copyright 2007 The Apache Software Foundation
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.io.MapWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.OutputCollector;
-
-/**
- * Refine the types that can be collected from a Table Map/Reduce jobs.
- */
-public class TableOutputCollector {
-  /** The collector object */
-  @SuppressWarnings("unchecked")
-  public OutputCollector collector;
-
-  /**
-   * Restrict Table Map/Reduce's output to be a Text key and a record.
-   * 
-   * @param key
-   * @param value
-   * @throws IOException
-   */
-  @SuppressWarnings("unchecked")
-  public void collect(Text key, MapWritable value) throws IOException {
-    collector.collect(key, value);
-  }
-}

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/java/org/apache/hadoop/hbase/mapred/TableReduce.java
 Sat Jan 12 13:32:46 2008
@@ -22,8 +22,10 @@
 import java.io.IOException;
 import java.util.Iterator;
 
-import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.MapWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MapReduceBase;
 import org.apache.hadoop.mapred.OutputCollector;
@@ -34,14 +36,8 @@
  * Write a table, sorting by the input key
  */
 @SuppressWarnings("unchecked")
-public abstract class TableReduce extends MapReduceBase implements Reducer {
-  TableOutputCollector m_collector;
-
-  /** Constructor */
-  public TableReduce() {
-    m_collector = new TableOutputCollector();
-  }
-
+public abstract class TableReduce<K extends WritableComparable, V extends 
Writable>
+    extends MapReduceBase implements Reducer<K, V, Text, MapWritable> {
   /**
    * Use this before submitting a TableReduce job. It will
    * appropriately set up the JobConf.
@@ -50,31 +46,14 @@
    * @param reducer
    * @param job
    */
-  public static void initJob(String table, Class<? extends TableReduce> 
reducer,
-      JobConf job) {
-    
+  public static void initJob(String table,
+      Class<? extends TableReduce> reducer, JobConf job) {
     job.setOutputFormat(TableOutputFormat.class);
     job.setReducerClass(reducer);
     job.set(TableOutputFormat.OUTPUT_TABLE, table);
   }
 
   /**
-   * Create a unique key for table insertion by appending a local
-   * counter the given key.
-   *
-   * @see 
org.apache.hadoop.mapred.Reducer#reduce(org.apache.hadoop.io.WritableComparable,
 java.util.Iterator, org.apache.hadoop.mapred.OutputCollector, 
org.apache.hadoop.mapred.Reporter)
-   */
-  @SuppressWarnings("unchecked")
-  public void reduce(WritableComparable key, Iterator values,
-      OutputCollector output, Reporter reporter) throws IOException {
-
-    if(m_collector.collector == null) {
-      m_collector.collector = output;
-    }
-    reduce((Text)key, values, m_collector, reporter);
-  }
-
-  /**
    * 
    * @param key
    * @param values
@@ -82,8 +61,7 @@
    * @param reporter
    * @throws IOException
    */
-  @SuppressWarnings("unchecked")
-  public abstract void reduce(Text key, Iterator values, 
-      TableOutputCollector output, Reporter reporter) throws IOException;
-
+  public abstract void reduce(K key, Iterator<V> values,
+      OutputCollector<Text, MapWritable> output, Reporter reporter)
+      throws IOException;
 }

Modified: 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
URL: 
http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java?rev=611488&r1=611487&r2=611488&view=diff
==============================================================================
--- 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
 (original)
+++ 
lucene/hadoop/trunk/src/contrib/hbase/src/test/org/apache/hadoop/hbase/mapred/TestTableMapReduce.java
 Sat Jan 12 13:32:46 2008
@@ -46,6 +46,7 @@
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.MiniMRCluster;
 import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.OutputCollector;
 
 /**
  * Test Map/Reduce job over HBase tables
@@ -147,13 +148,7 @@
   /**
    * Pass the given key and processed record reduce
    */
-  public static class ProcessContentsMapper extends TableMap {
-
-    /** constructor */
-    public ProcessContentsMapper() {
-      super();
-    }
-
+  public static class ProcessContentsMapper extends TableMap<Text, 
MapWritable> {
     /**
      * Pass the key, and reversed value to reduce
      *
@@ -162,7 +157,7 @@
     @SuppressWarnings("unchecked")
     @Override
     public void map(HStoreKey key, MapWritable value,
-        TableOutputCollector output,
+        OutputCollector<Text, MapWritable> output,
         @SuppressWarnings("unused") Reporter reporter) throws IOException {
       
       Text tKey = key.getRow();


Reply via email to