Hi,

I am kind of stuck on this one, I read all the other similar issues and
coded based on that. But still i get this error.

Any help or clue will help me moving forward.

Thanks




On Mon, Sep 10, 2012 at 7:06 PM, Jothikumar Ekanath <kbmku...@gmail.com>wrote:

> Hi,
>        Getting this error while using hbase as a sink.
>
>
> Error
> java.io.IOException: Pass a Delete or a Put
>         at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:125)
>         at
> org.apache.hadoop.hbase.mapreduce.TableOutputFormat$TableRecordWriter.write(TableOutputFormat.java:84)
>         at
> org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:587)
>         at
> org.apache.hadoop.mapreduce.TaskInputOutputContext.write(TaskInputOutputContext.java:80)
>         at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:156)
>         at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
>         at
> org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:649)
>         at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:417)
>         at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
>         at java.security.AccessController.doPrivileged(Native Method)
>         at javax.security.auth.Subject.doAs(Subject.java:396)
>         at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1121)
>         at org.apache.hadoop.mapred.Child.main(Child.java:249)
>
>
>
>  Below is my code
> Using the following version
>
> Hbase = 0.94
> Hadoop - 1.0.3
>
> import org.apache.hadoop.conf.Configuration;
> import org.apache.hadoop.hbase.HBaseConfiguration;
> import org.apache.hadoop.hbase.KeyValue;
> import org.apache.hadoop.hbase.client.Put;
> import org.apache.hadoop.hbase.client.Result;
> import org.apache.hadoop.hbase.client.Scan;
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
> import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
> import org.apache.hadoop.hbase.mapreduce.TableMapper;
> import org.apache.hadoop.hbase.mapreduce.TableReducer;
> import org.apache.hadoop.hbase.util.Bytes;
> import org.apache.hadoop.io.Text;
> import org.apache.hadoop.mapreduce.Job;
> import org.apache.hadoop.mapreduce.*;
>
> import java.io.IOException;
> import java.nio.ByteBuffer;
> import java.util.ArrayList;
> import java.util.List;
>
> public class DailyAggMapReduce {
>
>     public static void main(String args[]) throws Exception {
>         Configuration config = HBaseConfiguration.create();
>         Job job = new Job(config, "DailyAverageMR");
>         job.setJarByClass(DailyAggMapReduce.class);
>         Scan scan = new Scan();
>         // 1 is the default in Scan, which will be bad for MapReduce jobs
>         scan.setCaching(500);
>         // don't set to true for MR jobs
>         scan.setCacheBlocks(false);
>
>         TableMapReduceUtil.initTableMapperJob(
>                 "HTASDB",        // input table
>                 scan,               // Scan instance to control CF and
> attribute selection
>                 DailySumMapper.class,     // mapper class
>                 Text.class,         // mapper output key
>                 Text.class,  // mapper output value
>                 job);
>
>         TableMapReduceUtil.initTableReducerJob(
>                 "DA",        // output table
>                 DailySumReducer.class,    // reducer class
>                 job);
>
>         //job.setOutputValueClass(Put.class);
>         job.setNumReduceTasks(1);   // at least one, adjust as required
>
>         boolean b = job.waitForCompletion(true);
>         if (!b) {
>             throw new IOException("error with job!");
>         }
>
>     }
>
>
>     public static class DailySumMapper extends TableMapper<Text, Text> {
>
>         public void map(ImmutableBytesWritable row, Result value,
> Mapper.Context context) throws IOException, InterruptedException {
>             List<String> key = getRowKey(row.get());
>             Text rowKey = new Text(key.get(0));
>             int time = Integer.parseInt(key.get(1));
>             //limiting the time for one day (Aug 04 2012) -- Testing, Not
> a good way
>             if (time <= 1344146400) {
>                 List<KeyValue> data = value.list();
>                 long inbound = 0l;
>                 long outbound = 0l;
>                 for (KeyValue kv : data) {
>                     List<Long> values = getValues(kv.getValue());
>                     if (values.get(0) != -1) {
>                         inbound = inbound + values.get(0);
>                     }
>                     if (values.get(1) != -1) {
>                         outbound = outbound + values.get(1);
>                     }
>                 }
>                 context.write(rowKey, new Text(String.valueOf(inbound) +
> "-" + String.valueOf(outbound)));
>             }
>         }
>
>         private static List<Long> getValues(byte[] data) {
>             List<Long> values = new ArrayList<Long>();
>             ByteBuffer buffer = ByteBuffer.wrap(data);
>             values.add(buffer.getLong());
>             values.add(buffer.getLong());
>             return values;
>         }
>
>         private static List<String> getRowKey(byte[] key) {
>             List<String> keys = new ArrayList<String>();
>             ByteBuffer buffer = ByteBuffer.wrap(key);
>             StringBuilder sb = new StringBuilder();
>             sb.append(buffer.getInt());
>             sb.append("-");
>             if (key.length == 13) {
>                 sb.append(buffer.getInt());
>                 sb.append("-");
>             }
>             sb.append(buffer.get());
>             keys.add(sb.toString());
>             keys.add(String.valueOf(buffer.getInt()));
>             return keys;
>         }
>     }
>
>     public static class DailySumReducer extends TableReducer<Text, Text,
> Put> {
>         private int count = 0;
>         public void reduce(Text key, Iterable<Text> values,
> Reducer.Context context) throws IOException, InterruptedException {
>             long inbound = 0l;
>             long outbound = 0l;
>             for (Text val : values) {
>                 String text = val.toString();
>                 int index = text.indexOf("-");
>                 String in = text.substring(0,index);
>                 String out = text.substring(index+1,text.length());
>                 inbound = inbound + Long.parseLong(in);
>                 outbound = outbound + Long.parseLong(out);
>             }
>             ByteBuffer data = ByteBuffer.wrap(new byte[16]);
>             data.putLong(inbound);
>             data.putLong(outbound);
>             Put put = new Put(Bytes.toBytes(key.toString()+20120804));
>             put.add(Bytes.toBytes("t"), Bytes.toBytes("s"),data.array());
>             context.setStatus("Emitting Put " + count++);
>             context.write(key, put);
>         }
>     }
> }
>
> Thanks
> Jothikumar
>
>

Reply via email to