Hi, I am kind of stuck on this one, I read all the other similar issues and coded based on that. But still i get this error.
Any help or clue will help me moving forward. Thanks On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kbmku...@gmail.com>wrote: > Hi, > Getting this error while using hbase as a sink. > > > Error > java.io.IOException: Pass a Delete or a Put > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125) > at > org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84) > at > org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587) > at > org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80) > at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156) > at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176) > at > org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649) > at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417) > at org.apache.hadoop.mapred.Child$4.run(Child.java:255) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:396) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121) > at org.apache.hadoop.mapred.Child.main(Child.java:249) > > > > Below is my code > Using the following version > > Hbase = 0.94 > Hadoop - 1.0.3 > > import org.apache.hadoop.conf.Configuration; > import org.apache.hadoop.hbase.HBaseConfiguration; > import org.apache.hadoop.hbase.KeyValue; > import org.apache.hadoop.hbase.client.Put; > import org.apache.hadoop.hbase.client.Result; > import org.apache.hadoop.hbase.client.Scan; > import org.apache.hadoop.hbase.io.ImmutableBytesWritable; > import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; > import org.apache.hadoop.hbase.mapreduce.TableMapper; > import org.apache.hadoop.hbase.mapreduce.TableReducer; > import org.apache.hadoop.hbase.util.Bytes; > import org.apache.hadoop.io.Text; > import org.apache.hadoop.mapreduce.Job; > import org.apache.hadoop.mapreduce.*; > > import java.io.IOException; > import java.nio.ByteBuffer; > import java.util.ArrayList; > import java.util.List; > > public class DailyAggMapReduce { > > public static void main(String args[]) throws Exception { > Configuration config = HBaseConfiguration.create(); > Job job = new Job(config, "DailyAverageMR"); > job.setJarByClass(DailyAggMapReduce.class); > Scan scan = new Scan(); > // 1 is the default in Scan, which will be bad for MapReduce jobs > scan.setCaching(500); > // don't set to true for MR jobs > scan.setCacheBlocks(false); > > TableMapReduceUtil.initTableMapperJob( > "HTASDB", // input table > scan, // Scan instance to control CF and > attribute selection > DailySumMapper.class, // mapper class > Text.class, // mapper output key > Text.class, // mapper output value > job); > > TableMapReduceUtil.initTableReducerJob( > "DA", // output table > DailySumReducer.class, // reducer class > job); > > //job.setOutputValueClass(Put.class); > job.setNumReduceTasks(1); // at least one, adjust as required > > boolean b = job.waitForCompletion(true); > if (!b) { > throw new IOException("error with job!"); > } > > } > > > public static class DailySumMapper extends TableMapper<Text, Text> { > > public void map(ImmutableBytesWritable row, Result value, > Mapper.Context context) throws IOException, InterruptedException { > List<String> key = getRowKey(row.get()); > Text rowKey = new Text(key.get(0)); > int time = Integer.parseInt(key.get(1)); > //limiting the time for one day (Aug 04 2012) -- Testing, Not > a good way > if (time <= 1344146400) { > List<KeyValue> data = value.list(); > long inbound = 0l; > long outbound = 0l; > for (KeyValue kv : data) { > List<Long> values = getValues(kv.getValue()); > if (values.get(0) != -1) { > inbound = inbound + values.get(0); > } > if (values.get(1) != -1) { > outbound = outbound + values.get(1); > } > } > context.write(rowKey, new Text(String.valueOf(inbound) + > "-" + String.valueOf(outbound))); > } > } > > private static List<Long> getValues(byte[] data) { > List<Long> values = new ArrayList<Long>(); > ByteBuffer buffer = ByteBuffer.wrap(data); > values.add(buffer.getLong()); > values.add(buffer.getLong()); > return values; > } > > private static List<String> getRowKey(byte[] key) { > List<String> keys = new ArrayList<String>(); > ByteBuffer buffer = ByteBuffer.wrap(key); > StringBuilder sb = new StringBuilder(); > sb.append(buffer.getInt()); > sb.append("-"); > if (key.length == 13) { > sb.append(buffer.getInt()); > sb.append("-"); > } > sb.append(buffer.get()); > keys.add(sb.toString()); > keys.add(String.valueOf(buffer.getInt())); > return keys; > } > } > > public static class DailySumReducer extends TableReducer<Text, Text, > Put> { > private int count = 0; > public void reduce(Text key, Iterable<Text> values, > Reducer.Context context) throws IOException, InterruptedException { > long inbound = 0l; > long outbound = 0l; > for (Text val : values) { > String text = val.toString(); > int index = text.indexOf("-"); > String in = text.substring(0,index); > String out = text.substring(index+1,text.length()); > inbound = inbound + Long.parseLong(in); > outbound = outbound + Long.parseLong(out); > } > ByteBuffer data = ByteBuffer.wrap(new byte[16]); > data.putLong(inbound); > data.putLong(outbound); > Put put = new Put(Bytes.toBytes(key.toString()+20120804)); > put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array()); > context.setStatus("Emitting Put " + count++); > context.write(key, put); > } > } > } > > Thanks > Jothikumar > >