Hi, Hbase/Hadoop Setup: 1. 3 regionservers 2. Run the task using 20 Map Tasks and 20 Reduce Tasks. 3. Using an older hbase version from the trunk [ Version: 0.20.0-dev, r786695, Sat Jun 20 18:01:17 EDT 2009 ] 4. Using hadoop [ 0.20.0 ]
Test Data: 1. The input is a CSV file with a 1M rows and about 20 columns and 4 metrics. 2. Output is 4 hbase tables "txn_m1", "txn_m2", "txn_m3", "txn_m4". The task is to parse through the CSV file and for each metric m1 create an entry into the hbase table "txn_m1" with the columns as needed. Attached is an pdf [from an excel] which explains how a single row in the CSV is converted into hbase data in the mapper and reducer stage. Attached is the code as well. For processing a 1M records, it is taking about 38 minutes. I am using HTable.incrementColumnValue() in the reduce pass to create the records in the hbase tables. Is there anything I should be doing differently or inherently incorrect? I would like run this task in 1 minute. Thanks for the help, Irfan Here is the output of the process. Let me know if I should attach any other log. 09/07/02 15:19:11 INFO mapred.JobClient: Running job: job_200906192236_5114 09/07/02 15:19:12 INFO mapred.JobClient: map 0% reduce 0% 09/07/02 15:19:29 INFO mapred.JobClient: map 30% reduce 0% 09/07/02 15:19:32 INFO mapred.JobClient: map 46% reduce 0% 09/07/02 15:19:35 INFO mapred.JobClient: map 64% reduce 0% 09/07/02 15:19:38 INFO mapred.JobClient: map 75% reduce 0% 09/07/02 15:19:44 INFO mapred.JobClient: map 76% reduce 0% 09/07/02 15:19:47 INFO mapred.JobClient: map 99% reduce 1% 09/07/02 15:19:50 INFO mapred.JobClient: map 100% reduce 3% 09/07/02 15:19:53 INFO mapred.JobClient: map 100% reduce 4% 09/07/02 15:19:56 INFO mapred.JobClient: map 100% reduce 10% 09/07/02 15:19:59 INFO mapred.JobClient: map 100% reduce 12% 09/07/02 15:20:02 INFO mapred.JobClient: map 100% reduce 16% 09/07/02 15:20:05 INFO mapred.JobClient: map 100% reduce 25% 09/07/02 15:20:08 INFO mapred.JobClient: map 100% reduce 33% 09/07/02 15:20:11 INFO mapred.JobClient: map 100% reduce 36% 09/07/02 15:20:14 INFO mapred.JobClient: map 100% reduce 39% 09/07/02 15:20:17 INFO mapred.JobClient: map 100% reduce 41% 09/07/02 15:20:29 INFO mapred.JobClient: map 100% reduce 42% 09/07/02 15:20:32 INFO mapred.JobClient: map 100% reduce 44% 09/07/02 15:20:38 INFO mapred.JobClient: map 100% reduce 46% 09/07/02 15:20:49 INFO mapred.JobClient: map 100% reduce 47% 09/07/02 15:20:55 INFO mapred.JobClient: map 100% reduce 50% 09/07/02 15:21:01 INFO mapred.JobClient: map 100% reduce 51% 09/07/02 15:21:34 INFO mapred.JobClient: map 100% reduce 52% 09/07/02 15:21:39 INFO mapred.JobClient: map 100% reduce 53% 09/07/02 15:22:06 INFO mapred.JobClient: map 100% reduce 54% 09/07/02 15:22:28 INFO mapred.JobClient: map 100% reduce 55% 09/07/02 15:22:44 INFO mapred.JobClient: map 100% reduce 56% 09/07/02 15:23:02 INFO mapred.JobClient: Task Id : attempt_200906192236_5114_r_000002_0, Status : FAILED attempt_200906192236_5114_r_000002_0: [2009-07-02 15:20:27.230] fetching new record writer ... attempt_200906192236_5114_r_000002_0: [2009-07-02 15:22:51.429] failed to initialize the hbase configuration 09/07/02 15:23:08 INFO mapred.JobClient: map 100% reduce 53% 09/07/02 15:23:08 INFO mapred.JobClient: Task Id : attempt_200906192236_5114_r_000013_0, Status : FAILED org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to locate root region at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107) at com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413) at org.apache.hadoop.mapred.Child.main(Child.java:170) attempt_200906192236_5114_r_000013_0: [2009-07-02 15:20:33.183] fetching new record writer ... attempt_200906192236_5114_r_000013_0: [2009-07-02 15:23:04.369] failed to initialize the hbase configuration 09/07/02 15:23:09 INFO mapred.JobClient: map 100% reduce 50% 09/07/02 15:23:14 INFO mapred.JobClient: Task Id : attempt_200906192236_5114_r_000012_0, Status : FAILED org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to locate root region at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107) at com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413) at org.apache.hadoop.mapred.Child.main(Child.java:170) attempt_200906192236_5114_r_000012_0: [2009-07-02 15:20:48.434] fetching new record writer ... attempt_200906192236_5114_r_000012_0: [2009-07-02 15:23:10.185] failed to initialize the hbase configuration 09/07/02 15:23:15 INFO mapred.JobClient: map 100% reduce 48% 09/07/02 15:23:17 INFO mapred.JobClient: Task Id : attempt_200906192236_5114_r_000014_0, Status : FAILED org.apache.hadoop.hbase.client.NoServerForRegionException: Timed out trying to locate root region at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRootRegion(HConnectionManager.java:863) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:514) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:523) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.relocateRegion(HConnectionManager.java:496) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegionInMeta(HConnectionManager.java:628) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:527) at org.apache.hadoop.hbase.client.HConnectionManager$TableServers.locateRegion(HConnectionManager.java:490) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:124) at org.apache.hadoop.hbase.client.HTable.<init>(HTable.java:107) at com.qwapi.txnload.LoadMultipleCubes$CubeOutputFormat.getRecordWriter(LoadMultipleCubes.java:442) at org.apache.hadoop.mapred.ReduceTask.runOldReducer(ReduceTask.java:435) at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:413) at org.apache.hadoop.mapred.Child.main(Child.java:170) attempt_200906192236_5114_r_000014_0: [2009-07-02 15:20:47.442] fetching new record writer ... attempt_200906192236_5114_r_000014_0: [2009-07-02 15:23:13.285] failed to initialize the hbase configuration 09/07/02 15:23:18 INFO mapred.JobClient: map 100% reduce 45% 09/07/02 15:23:21 INFO mapred.JobClient: map 100% reduce 46% 09/07/02 15:23:29 INFO mapred.JobClient: map 100% reduce 47% 09/07/02 15:23:32 INFO mapred.JobClient: map 100% reduce 48% 09/07/02 15:23:36 INFO mapred.JobClient: map 100% reduce 49% 09/07/02 15:23:39 INFO mapred.JobClient: map 100% reduce 51% 09/07/02 15:23:42 INFO mapred.JobClient: map 100% reduce 56% 09/07/02 15:23:45 INFO mapred.JobClient: map 100% reduce 58% 09/07/02 15:24:20 INFO mapred.JobClient: map 100% reduce 59% 09/07/02 15:25:11 INFO mapred.JobClient: map 100% reduce 60% 09/07/02 15:25:17 INFO mapred.JobClient: map 100% reduce 61% 09/07/02 15:25:26 INFO mapred.JobClient: map 100% reduce 62% 09/07/02 15:25:32 INFO mapred.JobClient: map 100% reduce 64% 09/07/02 15:25:38 INFO mapred.JobClient: map 100% reduce 65% 09/07/02 15:26:20 INFO mapred.JobClient: map 100% reduce 66% 09/07/02 15:26:40 INFO mapred.JobClient: map 100% reduce 67% 09/07/02 15:26:48 INFO mapred.JobClient: map 100% reduce 68% 09/07/02 15:27:16 INFO mapred.JobClient: map 100% reduce 69% 09/07/02 15:27:21 INFO mapred.JobClient: map 100% reduce 70% 09/07/02 15:27:46 INFO mapred.JobClient: map 100% reduce 71% 09/07/02 15:28:25 INFO mapred.JobClient: map 100% reduce 72% 09/07/02 15:28:46 INFO mapred.JobClient: map 100% reduce 73% 09/07/02 15:29:08 INFO mapred.JobClient: map 100% reduce 74% 09/07/02 15:29:45 INFO mapred.JobClient: map 100% reduce 76% 09/07/02 15:30:42 INFO mapred.JobClient: map 100% reduce 77% 09/07/02 15:31:06 INFO mapred.JobClient: map 100% reduce 78% 09/07/02 15:31:12 INFO mapred.JobClient: map 100% reduce 79% 09/07/02 15:31:36 INFO mapred.JobClient: map 100% reduce 81% 09/07/02 15:31:37 INFO mapred.JobClient: map 100% reduce 82% 09/07/02 15:32:00 INFO mapred.JobClient: map 100% reduce 83% 09/07/02 15:32:09 INFO mapred.JobClient: map 100% reduce 84% 09/07/02 15:32:30 INFO mapred.JobClient: map 100% reduce 86% 09/07/02 15:38:42 INFO mapred.JobClient: map 100% reduce 88% 09/07/02 15:39:49 INFO mapred.JobClient: map 100% reduce 89% 09/07/02 15:41:13 INFO mapred.JobClient: map 100% reduce 90% 09/07/02 15:41:16 INFO mapred.JobClient: map 100% reduce 91% 09/07/02 15:41:28 INFO mapred.JobClient: map 100% reduce 93% 09/07/02 15:44:34 INFO mapred.JobClient: map 100% reduce 94% 09/07/02 15:45:41 INFO mapred.JobClient: map 100% reduce 95% 09/07/02 15:45:50 INFO mapred.JobClient: map 100% reduce 96% 09/07/02 15:46:17 INFO mapred.JobClient: map 100% reduce 98% 09/07/02 15:55:29 INFO mapred.JobClient: map 100% reduce 99% 09/07/02 15:57:08 INFO mapred.JobClient: map 100% reduce 100% 09/07/02 15:57:14 INFO mapred.JobClient: Job complete: job_200906192236_5114 09/07/02 15:57:14 INFO mapred.JobClient: Counters: 18 09/07/02 15:57:14 INFO mapred.JobClient: Job Counters 09/07/02 15:57:14 INFO mapred.JobClient: Launched reduce tasks=24 09/07/02 15:57:14 INFO mapred.JobClient: Rack-local map tasks=2 09/07/02 15:57:14 INFO mapred.JobClient: Launched map tasks=20 09/07/02 15:57:14 INFO mapred.JobClient: Data-local map tasks=18 09/07/02 15:57:14 INFO mapred.JobClient: FileSystemCounters 09/07/02 15:57:14 INFO mapred.JobClient: FILE_BYTES_READ=1848609562 09/07/02 15:57:14 INFO mapred.JobClient: HDFS_BYTES_READ=57982980 09/07/02 15:57:14 INFO mapred.JobClient: FILE_BYTES_WRITTEN=2768325646 09/07/02 15:57:14 INFO mapred.JobClient: Map-Reduce Framework 09/07/02 15:57:14 INFO mapred.JobClient: Reduce input groups=4863 09/07/02 15:57:14 INFO mapred.JobClient: Combine output records=0 09/07/02 15:57:14 INFO mapred.JobClient: Map input records=294786 09/07/02 15:57:14 INFO mapred.JobClient: Reduce shuffle bytes=883803390 09/07/02 15:57:14 INFO mapred.JobClient: Reduce output records=0 09/07/02 15:57:14 INFO mapred.JobClient: Spilled Records=50956464 09/07/02 15:57:14 INFO mapred.JobClient: Map output bytes=888797024 09/07/02 15:57:14 INFO mapred.JobClient: Map input bytes=57966580 09/07/02 15:57:14 INFO mapred.JobClient: Combine input records=0 09/07/02 15:57:14 INFO mapred.JobClient: Map output records=16985488 09/07/02 15:57:14 INFO mapred.JobClient: Reduce input records=16985488
import java.io.DataOutputStream; import java.io.IOException; import java.text.DateFormat; import java.text.MessageFormat; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Calendar; import java.util.Date; import java.util.GregorianCalendar; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseClusterTestCase; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.HbaseMapWritable; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileAlreadyExistsException; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.InvalidJobConfException; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.RecordWriter; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.RunningJob; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.lib.MultipleOutputs; import org.apache.hadoop.util.Progressable; import com.qwapi.txnload.util.LogService; /** * map/reduce job to load the data into the cubes. */ public class MultipleTablesTest extends HBaseClusterTestCase { /** logger for the MultipleTablesTest class */ private static final LogService LOGGER = LogService.getLogService(MultipleTablesTest.class); /** empty string array */ private static final String[] EMPTY_STRING_ARRAY = new String[0]; /** date format output [yyyyMMddHH] */ private static final SimpleDateFormat DATE_FORMAT_OUTPUT = new SimpleDateFormat("yyyyMMddHH"); /** date format input [yyyy-MM-dd HH:mm:ss] */ private static final SimpleDateFormat DATE_FORMAT_INPUT = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); /** map of ordinal ==> column_family */ private static final Map<Integer, String> COLUMNS = new HashMap<Integer, String>(50); private static final int[] ORDINALS = new int[] { 3, 4, }; private static final String[] TABLE_NAMES = new String[] { "test_t1_m1", "test_t1_m2" }; private static final String[] OUTPUTS = new String[] { "m1", "m2" }; private static final Class<? extends TestTableOutputFormat>[] OUTPUT_FORMAT_CLASSES = new Class[] { M1OutputFormat.class, M2OutputFormat.class }; /** row key format */ private static final DateFormat ROW_KEY_FORMAT = new SimpleDateFormat("yyyyMMddHH"); /** sid */ private static final String FAM_SID = "sid"; /** pid */ private static final String FAM_PID = "pid"; static { COLUMNS.put(1, FAM_SID + ":"); COLUMNS.put(2, FAM_PID + ":"); } /** input data */ private static final Object[][] INPUT_DATA = new Object[][] { { "2009-06-12 12:50:08", 100, 1000, 1, 2, }, { "2009-06-12 12:54:08", 100, 1000, 13, 45, }, { "2009-06-12 15:18:08", 100, 2000, 2, 3, }, { "2009-06-13 16:50:08", 100, 2000, 12, 83, }, { "2009-06-13 18:50:08", 200, 2000, 8, 5, }, { "2009-06-12 23:50:08", 200, 1000, 2, 3, }, }; /** * creates an instance of the map/reduce load test. */ public MultipleTablesTest() { } /** * setup the test case components. * * @throws java.lang.Exception exception setting up the test case */ public void setUp() throws Exception { } /** * tear down the test case components. * * @throws java.lang.Exception exception tearing down up the test case */ public void tearDown() throws Exception { } private String[] getFamilies() { List<String> result = new ArrayList<String>(10); for (Map.Entry<Integer, String> entry : COLUMNS.entrySet()) { result.add(entry.getValue()); } return result.toArray(EMPTY_STRING_ARRAY); } public void testLoadMultipleTables() throws Exception { for (int i1 = 0, n1 = TABLE_NAMES.length; n1 > 0; i1++, n1--) { String tableName = TABLE_NAMES[i1]; createTable(tableName, getFamilies()); } JobConf conf = new JobConf(); conf.setBoolean("localFS", true); Path inDir = setupInputDirectory(conf); conf.setJobName("chain"); conf.setInputFormat(TextInputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); conf.setMapOutputKeyClass(ImmutableBytesWritable.class); conf.setMapOutputValueClass(HbaseMapWritable.class); for (int j1 = 0, m1 = OUTPUTS.length; m1 > 0; j1++, m1--) { MultipleOutputs.addNamedOutput(conf, OUTPUTS[j1], OUTPUT_FORMAT_CLASSES[j1], ImmutableBytesWritable.class, Put.class); } conf.setOutputFormat(M1OutputFormat.class); conf.setOutputKeyClass(ImmutableBytesWritable.class); conf.setOutputValueClass(Put.class); FileInputFormat.setInputPaths(conf, inDir); int numMapTasks = Integer.parseInt(System.getProperty("mapred.map.tasks", "10")); conf.setInt("mapred.map.tasks", numMapTasks); conf.setMapperClass(TableMapperBase.class); conf.setReducerClass(TableLoader.class); int numReduceTasks = Integer.parseInt(System.getProperty("mapred.reduce.tasks", "10")); conf.setNumReduceTasks(numReduceTasks); JobClient jc = new JobClient(conf); RunningJob job = jc.submitJob(conf); while (!job.isComplete()) { Thread.sleep(100); } for (int i1 = 0, n1 = TABLE_NAMES.length; n1 > 0; i1++, n1--) { queryMetrics(TABLE_NAMES[i1], 100, 1000, DATE_FORMAT_INPUT.parse("2009-06-12 12:50:00")); } } /** * Creates a temporary table in HBase, and loads data into it by reading the specified CSV file. * @param tableName The name of the table which should be created * @param families The column families to be created in the table * @param csvFileName The CSV file from which data should be loaded. Can be null, in which case no data will be loaded. * @return the HTable instance that was created * @throws java.io.IOException if the table cannot be created, or the data cannot be loaded. */ private HTable createTable(String tableName, String[] families) throws IOException { /* * conf variable has been */ HBaseAdmin admin = new HBaseAdmin(conf); HTableDescriptor tableDescriptor = new HTableDescriptor(tableName); /* * Add the columns to the table */ for (int i = 0; i < families.length; i++) { tableDescriptor.addFamily(new HColumnDescriptor(families[i])); } if (admin.tableExists(tableName)) { admin.disableTable(tableName); admin.deleteTable(tableName); } if (!admin.tableExists(tableName)) { /* * Create the table. * Note - this table will not exist after the test case completes. */ admin.createTable(tableDescriptor); } /* * Get a reference to the just created table. */ HTable table = new HTable(conf, tableName); return table; } private Path setupInputDirectory(JobConf conf) throws IOException { Path inDir = new Path("testing/chain/input"); String localPathRoot = System.getProperty("test.build.data", "/tmp").replace(' ', '+'); inDir = new Path(localPathRoot, inDir); FileSystem fs = FileSystem.get(conf); if (!fs.mkdirs(inDir)) { throw new IOException("Mkdirs failed to create " + inDir.toString()); } DataOutputStream file = fs.create(new Path(inDir, "part-0")); for (int i1 = 0, n1 = INPUT_DATA.length; n1 > 0; i1++, n1--) { for (int j1 = 0, m1 = INPUT_DATA[i1].length; m1 > 0; j1++, m1--) { file.writeBytes("\"" + String.valueOf(INPUT_DATA[i1][j1]) + "\""); if (m1 > 1) { file.writeBytes(","); } } file.writeBytes("\n"); } file.close(); return inDir; } /** * sample method for querying the hbase tables. * * @param table table to query against * @param publisherId id of the publisher to query * @param siteId id of the site to query * @param eventTime event time to query * * @throws IOException all exceptions from the underlying engine are propagated back to the caller */ private void queryMetrics(String tableName, long siteId, long publisherId, Date eventTime) throws IOException { HTable table = new HTable(conf, tableName); Calendar calendar = GregorianCalendar.getInstance(); calendar.setTime(eventTime); byte[] startRow = Bytes.toBytes(siteId + "_" + ROW_KEY_FORMAT.format(calendar.getTime())); calendar.add(Calendar.HOUR_OF_DAY, 1); byte[] endRow = Bytes.toBytes(siteId + "_" + ROW_KEY_FORMAT.format(calendar.getTime())); byte[] inputFamily = Bytes.toBytes(FAM_PID); byte[] inputQualifier = Bytes.toBytes(String.valueOf(publisherId)); Scan scan = new Scan(startRow, endRow); scan.addColumn(inputFamily, inputQualifier); scan.setMaxVersions(1000000); ResultScanner scanner = table.getScanner(scan); if (scanner != null) { Result result = null; do { result = scanner.next(); if (result != null) { LOGGER.warn("row : [{0}]", Bytes.toString(result.getRow())); byte[] aggrValue = result.getValue(inputFamily, inputQualifier); LOGGER.warn("row : [{0}], [{1}:{2}] => [{3}]", Bytes.toString(result.getRow()), Bytes.toString(inputFamily), Bytes.toString(inputQualifier), Bytes.toLong(aggrValue)); } } while (result != null); } } /** * splits the input string into parts. * * @param input input string [a row in the transaction records] * * @return the array of split elements */ protected static String[] splitParts(String input) { String[] result = EMPTY_STRING_ARRAY; // Lines are comma-delimited with the individual columms and their // mappings defined in the {...@link #COLUMNS} map. String valueString = input.toString(); if (valueString.length() > 0) { valueString = valueString.trim(); // Remove the first " and the last " from the input valueString = valueString.substring(1, valueString.length() - 1); result = valueString.split("\",\""); } return result; } /** * returns <code>true</code> if the input string is not <code>null</code> * and has contents. * * @param input * the string to check * * @return <code>true</code> if the input string is not <code>null</code> * and has contents. */ public static boolean isGoodString(String input) { return ((input != null) && (input.length() > 0) && (input.trim().length() > 0)); } /** * loads the data from the value map into the specified table. */ public static class TableMapperBase implements Mapper<LongWritable, Text, ImmutableBytesWritable, HbaseMapWritable> { /** job configuration */ private JobConf jobConf_; /** * {...@inheritdoc} */ public void close() throws IOException { } /** * {...@inheritdoc} */ public void configure(JobConf jobConf) { this.jobConf_ = jobConf; } /** * returns the job configuration. * * @return the job configuration */ public JobConf getJobConf() { return this.jobConf_; } /** * {...@inheritdoc} */ public void map(LongWritable key, Text value, OutputCollector<ImmutableBytesWritable, HbaseMapWritable> collector, Reporter reporter) throws IOException { // Lines are comma-delimited with the individual columms and their // mappings defined in the {...@link #COLUMNS} map. String valueString = value.toString(); String[] splits = splitParts(valueString); if (splits.length == 0) { return; } String rowKey = null; // RowKey is <site_id>YYYYMMDDHH try { rowKey = splits[1] + "_" + DATE_FORMAT_OUTPUT.format((DATE_FORMAT_INPUT.parse(splits[0]))); } catch (ParseException e) { String msg = MessageFormat.format("date parsing failed for [{0}]. reason : [{1}]", valueString, e.getMessage()); LOGGER.error(msg); throw new IOException(msg); } if (isGoodString(rowKey)) { byte[][] metrics = new byte[ORDINALS.length][]; for (int i1 = 0, n1 = ORDINALS.length; n1 > 0; i1++, n1--) { try { metrics[i1] = Bytes.toBytes(Long.parseLong(splits[ORDINALS[i1]])); } catch (NumberFormatException e) { String msg = MessageFormat.format("metric parsing failed for [{0}], index : [{1}]. reason : [{2}]", valueString, ORDINALS[i1], e.getMessage()); LOGGER.error(msg); throw new IOException(msg); } } for (int i = 0; i < splits.length; i++) { String family = COLUMNS.get(i); if (family != null) { String column = family + splits[i]; for (int i1 = 0, n1 = ORDINALS.length; n1 > 0; i1++, n1--) { String newRowKey = OUTPUTS[i1] + "_" + rowKey; ImmutableBytesWritable row = new ImmutableBytesWritable(Bytes.toBytes(newRowKey)); HbaseMapWritable<byte[], byte[]> mapWritable = new HbaseMapWritable<byte[], byte[]>(); mapWritable.put(Bytes.toBytes(column), metrics[i1]); if (LOGGER.isDebugEnabled()) { LOGGER.debug("map ==> key : [{0}], column : [{1}], value : [{2}]", newRowKey, column, Bytes.toString(metrics[i1])); } collector.collect(row, mapWritable); } } } } } } /** * loads the data from the value map into the specified table. */ public static class TableLoader extends MapReduceBase implements Reducer<ImmutableBytesWritable, HbaseMapWritable, ImmutableBytesWritable, Put> { /** multiple outputs */ private MultipleOutputs multipleOutputs_; /** job configuration */ private JobConf jobConf_; /** * {...@inheritdoc} */ public void close() throws IOException { this.multipleOutputs_.close(); } /** * {...@inheritdoc} */ public void configure(JobConf jobConf) { this.jobConf_ = jobConf; this.multipleOutputs_ = new MultipleOutputs(jobConf); } /** * returns the job configuration. * * @return the job configuration */ public JobConf getJobConf() { return this.jobConf_; } /** * {...@inheritdoc} */ public void reduce(ImmutableBytesWritable key, Iterator<HbaseMapWritable> values, OutputCollector<ImmutableBytesWritable, Put> collector, Reporter reporter) throws IOException { String fullRowKey = Bytes.toString(key.get()); int idx = fullRowKey.indexOf("_"); String output = fullRowKey.substring(0, idx); String rowKey = fullRowKey.substring(idx + 1); byte[] rowKeyBytes = Bytes.toBytes(rowKey); ImmutableBytesWritable row = new ImmutableBytesWritable(rowKeyBytes); while (values.hasNext()) { HbaseMapWritable<byte[], byte[]> value = values.next(); Put put = new Put(rowKeyBytes); long versionTimestamp = System.nanoTime(); for (Entry<byte[], byte[]> entry : value.entrySet()) { put.add(entry.getKey(), versionTimestamp, entry.getValue()); } this.multipleOutputs_.getCollector(output, reporter).collect(row, put); } } } /** * Convert Map/Reduce output and write it to an HBase table. */ public static class TestTableOutputFormat extends FileOutputFormat<ImmutableBytesWritable, Put> { /** JobConf parameter that specifies the output table */ public static final String OUTPUT_TABLE = "hbase.mapred.outputtable"; /** LOGGER for TableOutputFormat */ private static final LogService LOGGER = LogService.getLogService(TestTableOutputFormat.class); private static HBaseConfiguration configuration_; private String tableName_; public TestTableOutputFormat(String tableName) { setTableName(tableName); } /** * {...@inheritdoc} */ public RecordWriter<ImmutableBytesWritable, Put> getRecordWriter(FileSystem ignored, JobConf jobConf, String name, Progressable progress) throws IOException { LOGGER.warn("fetching new record writer ..."); String tableName = getTableName(); HTable table = null; try { if (configuration_ == null) { configuration_ = new HBaseConfiguration(jobConf); } table = new HTable(configuration_, tableName); } catch (IOException e) { LOGGER.error(e, "failed to initialize the hbase configuration"); throw e; } table.setAutoFlush(true); RecordWriter<ImmutableBytesWritable, Put> result = new TableRecordWriter(table, tableName); LOGGER.warn("fetching new record writer completed"); return result; } private void setTableName(String tableName) { this.tableName_ = tableName; } private String getTableName() { return this.tableName_; } /** * {...@inheritdoc} */ @Override public void checkOutputSpecs(FileSystem ignored, JobConf jobConf) throws FileAlreadyExistsException, InvalidJobConfException, IOException { } } public static class M1OutputFormat extends TestTableOutputFormat { public M1OutputFormat() { super("test_t1_m1"); } } public static class M2OutputFormat extends TestTableOutputFormat { public M2OutputFormat() { super("test_t1_m2"); } } /** * Convert Reduce output (key, value) to (HStoreKey, KeyedDataArrayWritable) * and write to an HBase table. */ protected static class TableRecordWriter implements RecordWriter<ImmutableBytesWritable, Put> { /** table tied with the current record writer. */ private HTable table_; /** table name tied with the current record writer. */ private String tableName_; /** * Instantiate a TableRecordWriter with the HBase HClient for writing. * * @param hbaseAdmin hbase admin * @param table current table * @param tableName name of the current table */ public TableRecordWriter(HTable table, String tableName) { this.table_ = table; this.tableName_ = tableName; LOGGER.warn("creating a new table record writer for [{0}]", this.tableName_); } /** * {...@inheritdoc} */ public void close(Reporter reporter) throws IOException { LOGGER.warn("close called. flushing the commits ..."); this.table_.flushCommits(); LOGGER.warn("close called. flushing the commits completed"); LOGGER.warn("close called. closing the table ..."); this.table_.close(); LOGGER.warn("close called. closing the table completed"); } /** * {...@inheritdoc} */ public void write(ImmutableBytesWritable key, Put put) throws IOException { byte[] row = put.getRow(); for (Map.Entry<byte[], List<KeyValue>> familyEntry : put.getFamilyMap().entrySet()) { byte[] family = familyEntry.getKey(); for (KeyValue keyValue : familyEntry.getValue()) { byte[] qualifier = keyValue.getQualifier(); long amount = Bytes.toLong(keyValue.getValue()); this.table_.incrementColumnValue(row, family, qualifier, amount); } } } } }
hbase_performance.xls
Description: application/ole-storage