Hi,

Hbase/Hadoop Setup:
1. 3 regionservers 
2. Run the task using 20 Map Tasks and 20 Reduce Tasks. 
3. Using an older hbase version from the trunk [ Version: 0.20.0-dev, r786695, 
Sat Jun 20 18:01:17 EDT 2009 ]
4. Using hadoop [ 0.20.0 ]

Test Data:
1. The input is a CSV file with a 1M rows and about 20 columns and 4 metrics. 
2. Output is 4 hbase tables "txn_m1", "txn_m2", "txn_m3", "txn_m4". 

The task is to parse through the CSV file and for each metric m1 create an 
entry into the hbase table "txn_m1" with the columns as needed. Attached is an 
pdf [from an excel] which explains how a single row in the CSV is converted 
into hbase data in the mapper and reducer stage. Attached is the code as well.

For processing a 1M records, it is taking about 38 minutes. I am using 
HTable.incrementColumnValue() in the reduce pass to create the records in the 
hbase tables. 

Is there anything I should be doing differently or inherently incorrect? I 
would like run this task in 1 minute.

Thanks for the help,
Irfan

Here is the output of the process. Let me know if I should attach any other 
log. 

09/07/02 15:19:11 INFO mapred.JobClient: Running job: job_200906192236_5114
09/07/02 15:19:12 INFO mapred.JobClient:  map 0% reduce 0%
09/07/02 15:19:29 INFO mapred.JobClient:  map 30% reduce 0%
09/07/02 15:19:32 INFO mapred.JobClient:  map 46% reduce 0%
09/07/02 15:19:35 INFO mapred.JobClient:  map 64% reduce 0%
09/07/02 15:19:38 INFO mapred.JobClient:  map 75% reduce 0%
09/07/02 15:19:44 INFO mapred.JobClient:  map 76% reduce 0%
09/07/02 15:19:47 INFO mapred.JobClient:  map 99% reduce 1%
09/07/02 15:19:50 INFO mapred.JobClient:  map 100% reduce 3%
09/07/02 15:19:53 INFO mapred.JobClient:  map 100% reduce 4%
09/07/02 15:19:56 INFO mapred.JobClient:  map 100% reduce 10%
09/07/02 15:19:59 INFO mapred.JobClient:  map 100% reduce 12%
09/07/02 15:20:02 INFO mapred.JobClient:  map 100% reduce 16%
09/07/02 15:20:05 INFO mapred.JobClient:  map 100% reduce 25%
09/07/02 15:20:08 INFO mapred.JobClient:  map 100% reduce 33%
09/07/02 15:20:11 INFO mapred.JobClient:  map 100% reduce 36%
09/07/02 15:20:14 INFO mapred.JobClient:  map 100% reduce 39%
09/07/02 15:20:17 INFO mapred.JobClient:  map 100% reduce 41%
09/07/02 15:20:29 INFO mapred.JobClient:  map 100% reduce 42%
09/07/02 15:20:32 INFO mapred.JobClient:  map 100% reduce 44%
09/07/02 15:20:38 INFO mapred.JobClient:  map 100% reduce 46%
09/07/02 15:20:49 INFO mapred.JobClient:  map 100% reduce 47%
09/07/02 15:20:55 INFO mapred.JobClient:  map 100% reduce 50%
09/07/02 15:21:01 INFO mapred.JobClient:  map 100% reduce 51%
09/07/02 15:21:34 INFO mapred.JobClient:  map 100% reduce 52%
09/07/02 15:21:39 INFO mapred.JobClient:  map 100% reduce 53%
09/07/02 15:22:06 INFO mapred.JobClient:  map 100% reduce 54%
09/07/02 15:22:28 INFO mapred.JobClient:  map 100% reduce 55%
09/07/02 15:22:44 INFO mapred.JobClient:  map 100% reduce 56%
09/07/02 15:23:02 INFO mapred.JobClient: Task Id : 
attempt_200906192236_5114_r_000002_0, Status : FAILED
attempt_200906192236_5114_r_000002_0: [2009-07-02 15:20:27.230] fetching new 
record writer ...
attempt_200906192236_5114_r_000002_0: [2009-07-02 15:22:51.429] failed to 
initialize the hbase configuration
09/07/02 15:23:08 INFO mapred.JobClient:  map 100% reduce 53%
09/07/02 15:23:08 INFO mapred.JobClient: Task Id : 
attempt_200906192236_5114_r_000013_0, Status : FAILED
org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to 
locate root region
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107)
        at 
com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442)
        at 
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)

attempt_200906192236_5114_r_000013_0: [2009-07-02 15:20:33.183] fetching new 
record writer ...
attempt_200906192236_5114_r_000013_0: [2009-07-02 15:23:04.369] failed to 
initialize the hbase configuration
09/07/02 15:23:09 INFO mapred.JobClient:  map 100% reduce 50%
09/07/02 15:23:14 INFO mapred.JobClient: Task Id : 
attempt_200906192236_5114_r_000012_0, Status : FAILED
org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to 
locate root region
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107)
        at 
com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442)
        at 
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)

attempt_200906192236_5114_r_000012_0: [2009-07-02 15:20:48.434] fetching new 
record writer ...
attempt_200906192236_5114_r_000012_0: [2009-07-02 15:23:10.185] failed to 
initialize the hbase configuration
09/07/02 15:23:15 INFO mapred.JobClient:  map 100% reduce 48%
09/07/02 15:23:17 INFO mapred.JobClient: Task Id : 
attempt_200906192236_5114_r_000014_0, Status : FAILED
org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to 
locate root region
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527)
        at 
org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124)
        at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107)
        at 
com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442)
        at 
org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435)
        at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413)
        at org.apache.hadoop.mapred.Child.main(Child.java:170)

attempt_200906192236_5114_r_000014_0: [2009-07-02 15:20:47.442] fetching new 
record writer ...
attempt_200906192236_5114_r_000014_0: [2009-07-02 15:23:13.285] failed to 
initialize the hbase configuration
09/07/02 15:23:18 INFO mapred.JobClient:  map 100% reduce 45%
09/07/02 15:23:21 INFO mapred.JobClient:  map 100% reduce 46%
09/07/02 15:23:29 INFO mapred.JobClient:  map 100% reduce 47%
09/07/02 15:23:32 INFO mapred.JobClient:  map 100% reduce 48%
09/07/02 15:23:36 INFO mapred.JobClient:  map 100% reduce 49%
09/07/02 15:23:39 INFO mapred.JobClient:  map 100% reduce 51%
09/07/02 15:23:42 INFO mapred.JobClient:  map 100% reduce 56%
09/07/02 15:23:45 INFO mapred.JobClient:  map 100% reduce 58%
09/07/02 15:24:20 INFO mapred.JobClient:  map 100% reduce 59%
09/07/02 15:25:11 INFO mapred.JobClient:  map 100% reduce 60%
09/07/02 15:25:17 INFO mapred.JobClient:  map 100% reduce 61%
09/07/02 15:25:26 INFO mapred.JobClient:  map 100% reduce 62%
09/07/02 15:25:32 INFO mapred.JobClient:  map 100% reduce 64%
09/07/02 15:25:38 INFO mapred.JobClient:  map 100% reduce 65%
09/07/02 15:26:20 INFO mapred.JobClient:  map 100% reduce 66%
09/07/02 15:26:40 INFO mapred.JobClient:  map 100% reduce 67%
09/07/02 15:26:48 INFO mapred.JobClient:  map 100% reduce 68%
09/07/02 15:27:16 INFO mapred.JobClient:  map 100% reduce 69%
09/07/02 15:27:21 INFO mapred.JobClient:  map 100% reduce 70%
09/07/02 15:27:46 INFO mapred.JobClient:  map 100% reduce 71%
09/07/02 15:28:25 INFO mapred.JobClient:  map 100% reduce 72%
09/07/02 15:28:46 INFO mapred.JobClient:  map 100% reduce 73%
09/07/02 15:29:08 INFO mapred.JobClient:  map 100% reduce 74%
09/07/02 15:29:45 INFO mapred.JobClient:  map 100% reduce 76%
09/07/02 15:30:42 INFO mapred.JobClient:  map 100% reduce 77%
09/07/02 15:31:06 INFO mapred.JobClient:  map 100% reduce 78%
09/07/02 15:31:12 INFO mapred.JobClient:  map 100% reduce 79%
09/07/02 15:31:36 INFO mapred.JobClient:  map 100% reduce 81%
09/07/02 15:31:37 INFO mapred.JobClient:  map 100% reduce 82%
09/07/02 15:32:00 INFO mapred.JobClient:  map 100% reduce 83%
09/07/02 15:32:09 INFO mapred.JobClient:  map 100% reduce 84%
09/07/02 15:32:30 INFO mapred.JobClient:  map 100% reduce 86%
09/07/02 15:38:42 INFO mapred.JobClient:  map 100% reduce 88%
09/07/02 15:39:49 INFO mapred.JobClient:  map 100% reduce 89%
09/07/02 15:41:13 INFO mapred.JobClient:  map 100% reduce 90%
09/07/02 15:41:16 INFO mapred.JobClient:  map 100% reduce 91%
09/07/02 15:41:28 INFO mapred.JobClient:  map 100% reduce 93%
09/07/02 15:44:34 INFO mapred.JobClient:  map 100% reduce 94%
09/07/02 15:45:41 INFO mapred.JobClient:  map 100% reduce 95%
09/07/02 15:45:50 INFO mapred.JobClient:  map 100% reduce 96%
09/07/02 15:46:17 INFO mapred.JobClient:  map 100% reduce 98%
09/07/02 15:55:29 INFO mapred.JobClient:  map 100% reduce 99%
09/07/02 15:57:08 INFO mapred.JobClient:  map 100% reduce 100%
09/07/02 15:57:14 INFO mapred.JobClient: Job complete: job_200906192236_5114
09/07/02 15:57:14 INFO mapred.JobClient: Counters: 18
09/07/02 15:57:14 INFO mapred.JobClient:   Job Counters 
09/07/02 15:57:14 INFO mapred.JobClient:     Launched reduce tasks=24
09/07/02 15:57:14 INFO mapred.JobClient:     Rack-local map tasks=2
09/07/02 15:57:14 INFO mapred.JobClient:     Launched map tasks=20
09/07/02 15:57:14 INFO mapred.JobClient:     Data-local map tasks=18
09/07/02 15:57:14 INFO mapred.JobClient:   FileSystemCounters
09/07/02 15:57:14 INFO mapred.JobClient:     FILE_BYTES_READ=1848609562
09/07/02 15:57:14 INFO mapred.JobClient:     HDFS_BYTES_READ=57982980
09/07/02 15:57:14 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=2768325646
09/07/02 15:57:14 INFO mapred.JobClient:   Map-Reduce Framework
09/07/02 15:57:14 INFO mapred.JobClient:     Reduce input groups=4863
09/07/02 15:57:14 INFO mapred.JobClient:     Combine output records=0
09/07/02 15:57:14 INFO mapred.JobClient:     Map input records=294786
09/07/02 15:57:14 INFO mapred.JobClient:     Reduce shuffle bytes=883803390
09/07/02 15:57:14 INFO mapred.JobClient:     Reduce output records=0
09/07/02 15:57:14 INFO mapred.JobClient:     Spilled Records=50956464
09/07/02 15:57:14 INFO mapred.JobClient:     Map output bytes=888797024
09/07/02 15:57:14 INFO mapred.JobClient:     Map input bytes=57966580
09/07/02 15:57:14 INFO mapred.JobClient:     Combine input records=0
09/07/02 15:57:14 INFO mapred.JobClient:     Map output records=16985488
09/07/02 15:57:14 INFO mapred.JobClient:     Reduce input records=16985488

import java.io.DataOutputStream;
import java.io.IOException;
import java.text.DateFormat;
import java.text.MessageFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.GregorianCalendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordWriter;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;
import org.apache.hadoop.util.Progressable;

import com.qwapi.txnload.util.LogService;

/**
 * map/reduce job to load the data into the cubes.
 */
public class MultipleTablesTest extends HBaseClusterTestCase {
    /** logger for the MultipleTablesTest class */
    private static final LogService LOGGER = LogService.getLogService(MultipleTablesTest.class);

    /** empty string array */
    private static final String[] EMPTY_STRING_ARRAY = new String[0];

    /** date format output [yyyyMMddHH] */
    private static final SimpleDateFormat DATE_FORMAT_OUTPUT = new SimpleDateFormat("yyyyMMddHH");

    /** date format input [yyyy-MM-dd HH:mm:ss] */
    private static final SimpleDateFormat DATE_FORMAT_INPUT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    
    /** map of ordinal ==> column_family */
    private static final Map<Integer, String> COLUMNS = new HashMap<Integer, String>(50);

    private static final int[] ORDINALS = new int[] { 3, 4, };
    private static final String[] TABLE_NAMES = new String[] { "test_t1_m1", "test_t1_m2" };
    private static final String[] OUTPUTS = new String[] { "m1", "m2" };
    private static final Class<? extends TestTableOutputFormat>[] OUTPUT_FORMAT_CLASSES = new Class[] { M1OutputFormat.class, M2OutputFormat.class };
    
    /** row key format */
    private static final DateFormat ROW_KEY_FORMAT = new SimpleDateFormat("yyyyMMddHH");

    /** sid */
    private static final String FAM_SID = "sid";

    /** pid */
    private static final String FAM_PID = "pid";

    static {
        COLUMNS.put(1, FAM_SID + ":");
        COLUMNS.put(2, FAM_PID + ":");
    }

    /** input data */
    private static final Object[][] INPUT_DATA = new Object[][] {
            { "2009-06-12 12:50:08", 100, 1000, 1, 2, },
            { "2009-06-12 12:54:08", 100, 1000, 13, 45, },
            { "2009-06-12 15:18:08", 100, 2000, 2, 3, },
            { "2009-06-13 16:50:08", 100, 2000, 12, 83, },
            { "2009-06-13 18:50:08", 200, 2000, 8, 5, },
            { "2009-06-12 23:50:08", 200, 1000, 2, 3, },
    };
    
    /** 
     * creates an instance of the map/reduce load test.
     */
    public MultipleTablesTest() {
    }

    /**
     * setup the test case components.
     * 
     * @throws java.lang.Exception exception setting up the test case
     */
    public void setUp() throws Exception {
    }

    /**
     * tear down the test case components.
     * 
     * @throws java.lang.Exception exception tearing down up the test case
     */
    public void tearDown() throws Exception {
    }
    
    private String[] getFamilies() {
        List<String> result = new ArrayList<String>(10);
        
        for (Map.Entry<Integer, String> entry : COLUMNS.entrySet()) {
            result.add(entry.getValue());
        }
        
        return result.toArray(EMPTY_STRING_ARRAY);
    }

    public void testLoadMultipleTables() throws Exception {
        for (int i1 = 0, n1 = TABLE_NAMES.length; n1 > 0; i1++, n1--) {
            String tableName = TABLE_NAMES[i1];
            createTable(tableName, getFamilies());
        }
        
        JobConf conf = new JobConf();
        conf.setBoolean("localFS", true);
        
        Path inDir = setupInputDirectory(conf);

        conf.setJobName("chain");
        conf.setInputFormat(TextInputFormat.class);
        
        conf.setOutputKeyClass(ImmutableBytesWritable.class);
        conf.setOutputValueClass(Put.class);
        
        conf.setMapOutputKeyClass(ImmutableBytesWritable.class);
        conf.setMapOutputValueClass(HbaseMapWritable.class);

        for (int j1 = 0, m1 = OUTPUTS.length; m1 > 0; j1++, m1--) {
            MultipleOutputs.addNamedOutput(conf, 
                                           OUTPUTS[j1], 
                                           OUTPUT_FORMAT_CLASSES[j1], 
                                           ImmutableBytesWritable.class, 
                                           Put.class); 
        }
        
        conf.setOutputFormat(M1OutputFormat.class);
        conf.setOutputKeyClass(ImmutableBytesWritable.class);
        conf.setOutputValueClass(Put.class);

        FileInputFormat.setInputPaths(conf, inDir);

        int numMapTasks = Integer.parseInt(System.getProperty("mapred.map.tasks", "10"));
        conf.setInt("mapred.map.tasks", numMapTasks);
        
        conf.setMapperClass(TableMapperBase.class);
        conf.setReducerClass(TableLoader.class);
        
        int numReduceTasks = Integer.parseInt(System.getProperty("mapred.reduce.tasks", "10"));
        conf.setNumReduceTasks(numReduceTasks);
        
        JobClient jc = new JobClient(conf);
        RunningJob job = jc.submitJob(conf);
        
        while (!job.isComplete()) {
            Thread.sleep(100);
        }
        
        for (int i1 = 0, n1 = TABLE_NAMES.length; n1 > 0; i1++, n1--) {
            queryMetrics(TABLE_NAMES[i1], 100, 1000, DATE_FORMAT_INPUT.parse("2009-06-12 12:50:00"));
        }
    }

    /**
     * Creates a temporary table in HBase, and loads data into it by reading the specified CSV file.
     * @param tableName The name of the table which should be created 
     * @param families The column families to be created in the table
     * @param csvFileName The CSV file from which data should be loaded. Can be null, in which case no data will be loaded.
     * @return the HTable instance that was created
     * @throws java.io.IOException if the table cannot be created, or the data cannot be loaded. 
     */
    private HTable createTable(String tableName, String[] families) throws IOException {
        /*
         * conf variable has been 
         */
        HBaseAdmin admin = new HBaseAdmin(conf);
        HTableDescriptor tableDescriptor =  new HTableDescriptor(tableName);
        
        /*
         * Add the columns to the table
         */
        for (int i = 0; i < families.length; i++) {
            tableDescriptor.addFamily(new HColumnDescriptor(families[i]));
        }
        
        if (admin.tableExists(tableName)) {
            admin.disableTable(tableName);
            admin.deleteTable(tableName);
        }

        if (!admin.tableExists(tableName)) {
            /*
             * Create the table.
             * Note - this table will not exist after the test case completes.
             */
            admin.createTable(tableDescriptor);
        }
        
        /*
         * Get a reference to the just created table.
         */
        HTable table = new HTable(conf, tableName);
        
        return table;
    }

    private Path setupInputDirectory(JobConf conf) throws IOException {
        Path inDir = new Path("testing/chain/input");

        String localPathRoot = System.getProperty("test.build.data", "/tmp").replace(' ', '+');
        inDir = new Path(localPathRoot, inDir);

        FileSystem fs = FileSystem.get(conf);

        if (!fs.mkdirs(inDir)) {
            throw new IOException("Mkdirs failed to create " + inDir.toString());
        }

        DataOutputStream file = fs.create(new Path(inDir, "part-0"));
        
        for (int i1 = 0, n1 = INPUT_DATA.length; n1 > 0; i1++, n1--) {
            for (int j1 = 0, m1 = INPUT_DATA[i1].length; m1 > 0; j1++, m1--) {
                file.writeBytes("\"" + String.valueOf(INPUT_DATA[i1][j1]) + "\"");
                
                if (m1 > 1) {
                    file.writeBytes(",");
                }
            }
            
            file.writeBytes("\n");
        }
        
        file.close();
        
        return inDir;
    }
    
    /**
     * sample method for querying the hbase tables.
     * 
     * @param table table to query against
     * @param publisherId id of the publisher to query
     * @param siteId id of the site to query
     * @param eventTime event time to query
     * 
     * @throws IOException all exceptions from the underlying engine are propagated back to the caller
     */
    private void queryMetrics(String tableName, long siteId, long publisherId, Date eventTime) throws IOException {
        HTable table = new HTable(conf, tableName);
        
        Calendar calendar = GregorianCalendar.getInstance();
        calendar.setTime(eventTime);
        
        byte[] startRow = Bytes.toBytes(siteId + "_" + ROW_KEY_FORMAT.format(calendar.getTime()));
        calendar.add(Calendar.HOUR_OF_DAY, 1);
        
        byte[] endRow = Bytes.toBytes(siteId + "_" + ROW_KEY_FORMAT.format(calendar.getTime()));

        byte[] inputFamily = Bytes.toBytes(FAM_PID);
        byte[] inputQualifier = Bytes.toBytes(String.valueOf(publisherId));
        
        Scan scan = new Scan(startRow, endRow);
        scan.addColumn(inputFamily, inputQualifier);
        scan.setMaxVersions(1000000);
        
        ResultScanner scanner = table.getScanner(scan);
        
        if (scanner != null) {
            Result result = null;
            
            do {
                result = scanner.next();
                
                if (result != null) {
                    LOGGER.warn("row : [{0}]", Bytes.toString(result.getRow()));
                    byte[] aggrValue = result.getValue(inputFamily, inputQualifier);
                    LOGGER.warn("row : [{0}], [{1}:{2}] => [{3}]", 
                                Bytes.toString(result.getRow()), 
                                Bytes.toString(inputFamily), 
                                Bytes.toString(inputQualifier), 
                                Bytes.toLong(aggrValue));
                }
            } while (result != null);
        }
    }

    /**
     * splits the input string into parts.
     * 
     * @param input input string [a row in the transaction records]
     * 
     * @return the array of split elements
     */
    protected static String[] splitParts(String input) {
        String[] result = EMPTY_STRING_ARRAY;
        
        // Lines are comma-delimited with the individual columms and their 
        // mappings defined in the {...@link #COLUMNS} map.
        String valueString = input.toString();
        
        if (valueString.length() > 0) {
            valueString = valueString.trim();
            
            // Remove the first " and the last " from the input
            valueString = valueString.substring(1, valueString.length() - 1);
            
            result = valueString.split("\",\"");
        }
        
        return result;
    }

    /**
     * returns <code>true</code> if the input string is not <code>null</code>
     * and has contents.
     *
     * @param input
     *            the string to check
     *
     * @return <code>true</code> if the input string is not <code>null</code>
     *         and has contents.
     */
    public static boolean isGoodString(String input) {
        return ((input != null) && (input.length() > 0) && (input.trim().length() > 0));
    }

    /**
     * loads the data from the value map into the specified table.  
     */
    public static class TableMapperBase
        implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable>
    {
        /** job configuration */
        private JobConf jobConf_;
        
        /**
         * {...@inheritdoc}
         */
        public void close() throws IOException {
        }

        /**
         * {...@inheritdoc}
         */
        public void configure(JobConf jobConf) {
            this.jobConf_ = jobConf;
        }

        /**
         * returns the job configuration.
         * 
         * @return the job configuration
         */
        public JobConf getJobConf() {
            return this.jobConf_;
        }

        /**
         * {...@inheritdoc}
         */
        public void map(LongWritable key, 
                        Text value,
                        OutputCollector<ImmutableBytesWritable, HbaseMapWritable> collector,
                        Reporter reporter) 
            throws IOException 
        {
            // Lines are comma-delimited with the individual columms and their 
            // mappings defined in the {...@link #COLUMNS} map.
            String valueString = value.toString();
            
            String[] splits = splitParts(valueString);

            if (splits.length == 0) {
                return;
            }

            String rowKey = null;
            
            // RowKey is <site_id>YYYYMMDDHH
            try {
                rowKey = splits[1] + "_" + DATE_FORMAT_OUTPUT.format((DATE_FORMAT_INPUT.parse(splits[0])));
            } 
            catch (ParseException e) {
                String msg = MessageFormat.format("date parsing failed for [{0}]. reason : [{1}]", valueString, e.getMessage());
                LOGGER.error(msg);
                
                throw new IOException(msg);
            }
            
            if (isGoodString(rowKey)) {
                byte[][] metrics = new byte[ORDINALS.length][];

                for (int i1 = 0, n1 = ORDINALS.length; n1 > 0; i1++, n1--) {
                    try {
                        metrics[i1] = Bytes.toBytes(Long.parseLong(splits[ORDINALS[i1]]));
                    }
                    catch (NumberFormatException e) {
                        String msg = MessageFormat.format("metric parsing failed for [{0}], index : [{1}]. reason : [{2}]", valueString, ORDINALS[i1], e.getMessage());
                        LOGGER.error(msg);
                        
                        throw new IOException(msg);
                    }
                }
                
                for (int i = 0; i < splits.length; i++) {
                    String family = COLUMNS.get(i);
                    
                    if (family != null) {
                        String column = family + splits[i];
                        
                        for (int i1 = 0, n1 = ORDINALS.length; n1 > 0; i1++, n1--) {
                            String newRowKey = OUTPUTS[i1] + "_" + rowKey;
                            ImmutableBytesWritable row = new ImmutableBytesWritable(Bytes.toBytes(newRowKey));
                            
                            HbaseMapWritable<byte[], byte[]> mapWritable = new HbaseMapWritable<byte[], byte[]>();
                            mapWritable.put(Bytes.toBytes(column), metrics[i1]);
                            
                            if (LOGGER.isDebugEnabled()) {
                                LOGGER.debug("map ==> key : [{0}], column : [{1}], value : [{2}]", 
                                             newRowKey,
                                             column,
                                             Bytes.toString(metrics[i1]));
                            }

                            collector.collect(row, mapWritable);
                        }
                    }
                }
            }
        }
    }
    
    /**
     * loads the data from the value map into the specified table.  
     */
    public static class TableLoader
        extends MapReduceBase
        implements Reducer<ImmutableBytesWritable, HbaseMapWritable, ImmutableBytesWritable, Put> 
    {
        /** multiple outputs */
        private MultipleOutputs multipleOutputs_;
        
        /** job configuration */
        private JobConf jobConf_;
        
        /**
         * {...@inheritdoc}
         */
        public void close() throws IOException {
            this.multipleOutputs_.close();
        }

        /**
         * {...@inheritdoc}
         */
        public void configure(JobConf jobConf) {
            this.jobConf_ = jobConf;
            this.multipleOutputs_ = new MultipleOutputs(jobConf);
        }

        /**
         * returns the job configuration.
         * 
         * @return the job configuration
         */
        public JobConf getJobConf() {
            return this.jobConf_;
        }

        /**
         * {...@inheritdoc}
         */
        public void reduce(ImmutableBytesWritable key, 
                           Iterator<HbaseMapWritable> values,
                           OutputCollector<ImmutableBytesWritable, Put> collector,
                           Reporter reporter)
            throws IOException 
        {
            String fullRowKey = Bytes.toString(key.get());
            
            int idx = fullRowKey.indexOf("_");
            
            String output = fullRowKey.substring(0, idx);
            
            String rowKey = fullRowKey.substring(idx + 1);
            byte[] rowKeyBytes = Bytes.toBytes(rowKey);
            ImmutableBytesWritable row = new ImmutableBytesWritable(rowKeyBytes);
            
            while (values.hasNext()) {
                HbaseMapWritable<byte[], byte[]> value = values.next();
                
                Put put = new Put(rowKeyBytes);
                long versionTimestamp = System.nanoTime();
                
                for (Entry<byte[], byte[]> entry : value.entrySet()) {
                    put.add(entry.getKey(), versionTimestamp, entry.getValue());
                }
                
                this.multipleOutputs_.getCollector(output, reporter).collect(row, put);
            }
        }
    }

    /**
     * Convert Map/Reduce output and write it to an HBase table.
     */
    public static class TestTableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> {
        /** JobConf parameter that specifies the output table */
        public static final String OUTPUT_TABLE = "hbase.mapred.outputtable";
        
        /** LOGGER for TableOutputFormat */
        private static final LogService LOGGER = LogService.getLogService(TestTableOutputFormat.class);
        
        private static HBaseConfiguration configuration_;
        
        private String tableName_;

        public TestTableOutputFormat(String tableName) {
            setTableName(tableName);
        }

        /**
         * {...@inheritdoc}
         */
        public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(FileSystem ignored,
                JobConf jobConf, String name, Progressable progress) throws IOException 
        {
            LOGGER.warn("fetching new record writer ...");
            
            String tableName = getTableName();

            HTable table = null;
            
            try {
                if (configuration_ == null) {
                    configuration_ = new HBaseConfiguration(jobConf);
                }

                table = new HTable(configuration_, tableName);
            } 
            catch (IOException e) {
                LOGGER.error(e, "failed to initialize the hbase configuration");
                throw e;
            }
            
            table.setAutoFlush(true);
            
            RecordWriter<ImmutableBytesWritable, Put> result = new TableRecordWriter(table, tableName);
            
            LOGGER.warn("fetching new record writer completed");
            
            return result;
        }

        private void setTableName(String tableName) {
            this.tableName_ = tableName;
        }

        private String getTableName() {
            return this.tableName_;
        }

        /**
         * {...@inheritdoc}
         */
        @Override
        public void checkOutputSpecs(FileSystem ignored, JobConf jobConf)
            throws FileAlreadyExistsException, InvalidJobConfException, IOException 
        {
        }
    }
    
    public static class M1OutputFormat extends TestTableOutputFormat {
        public M1OutputFormat() {
            super("test_t1_m1");
        }
    }
    
    public static class M2OutputFormat extends TestTableOutputFormat {
        public M2OutputFormat() {
            super("test_t1_m2");
        }
    }
    
    /**
     * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable)
     * and write to an HBase table.
     */
    protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> {
        /** table tied with the current record writer. */
        private HTable table_;
        
        /** table name tied with the current record writer. */
        private String tableName_;
        
        /**
         * Instantiate a TableRecordWriter with the HBase HClient for writing.
         * 
         * @param hbaseAdmin hbase admin
         * @param table current table
         * @param tableName name of the current table
         */
        public TableRecordWriter(HTable table, String tableName) {
            this.table_ = table;
            this.tableName_ = tableName;
            
            LOGGER.warn("creating a new table record writer for [{0}]", this.tableName_);
        }

        /**
         * {...@inheritdoc}
         */
        public void close(Reporter reporter) throws IOException {
            LOGGER.warn("close called. flushing the commits ...");
            this.table_.flushCommits();
            LOGGER.warn("close called. flushing the commits completed");

            LOGGER.warn("close called. closing the table ...");
            this.table_.close();
            LOGGER.warn("close called. closing the table completed");
        }

        /**
         * {...@inheritdoc}
         */
        public void write(ImmutableBytesWritable key, Put put) throws IOException {
            byte[] row = put.getRow();

            for (Map.Entry<byte[], List<KeyValue>> familyEntry : put.getFamilyMap().entrySet()) {
                byte[] family = familyEntry.getKey();

                for (KeyValue keyValue : familyEntry.getValue()) {
                    byte[] qualifier = keyValue.getQualifier();

                    long amount = Bytes.toLong(keyValue.getValue());

                    this.table_.incrementColumnValue(row, family, qualifier, amount);
                }
            }
        }
    }
}

Attachment: hbase_performance.xls
Description: application/ole-storage

Reply via email to