Hi,

I am writing a UDAF which returns the top x results per key. Lets say my
input is

key attribute count
1      1            6
1      2            5
1      3            4
2      1            8
2      2            4
2      3            1

I want the top 2 results per key. Which will be:

key attribute count
1      1            6
1      2            5
2      1            8
2      2            4

I have written a UDAF for this in the attached file. However, when I run the
code, I get the exception:
FAILED: Unknown exception :
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector
cannot be cast to
org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector


Can anyone please let me know what I could be doing wrong?
Thanks and Regards,
Sonal
package org.apache.hadoop.hive.udaf;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;

import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;

public class TopXPerGroup extends UDAF{
	//holds count per key
	public static class Count implements Comparable{
		
		private Integer key;
		private Integer attribute;
		private Integer count;
		
		public Count(Integer key, Integer attribute, Integer count) {
			System.out.println("Creating count with " + key + " " +  attribute  + " " +count);
			this.key = key;
			this.count = count;	
			this.attribute = attribute;
		}
		
		public Integer getKey() {
			return key;
		}
		
		public void setKey(Integer key) {
			this.key = key;
		}
		
		public Integer getCount() {
			return count;
		}
		
		public void setCount(Integer count) {
			this.count = count;
		}

		public Integer getAttribute() {
			return attribute;
		}

		public void setAttribute(Integer attribute) {
			this.attribute = attribute;
		}

		@Override
		public int compareTo(Object to) {
			System.out.println("Comparing with " + to);
			if ((to == null) || (to.getClass() != getClass())) {
				return 1;
			}
			Count compare = (Count) to;
			if (compare.count == count) return 0;
			if (compare.count > count) return -1;
			return 1;
		}			
		
		public String toString() {
			return key + "," + attribute + "," + count;
		}
	}
	
	private HashMap<Integer, ArrayList<Count>> countPerGroup;
	
	public class TopXPerGroupIntEvaluator implements UDAFEvaluator {
		private Integer max;
		
		public TopXPerGroupIntEvaluator() {
			super();
			init();
		}
		
		public void init() {
			countPerGroup = new HashMap<Integer, ArrayList<Count>>();
		}
		
		public boolean iterate(Integer max, Integer groupBy, Integer attribute, Integer count) {
			System.out.println("Iterating for top" );
			  this.max = max;
		      ArrayList<Count> counts = countPerGroup.get(groupBy);
		      if (counts == null) {
		    	  counts = new ArrayList<Count>();		    	  
		      }
		      if (counts.size() < max) {
		    	  Count counter = new Count(groupBy, attribute, count);
		    	  counts.add(counter);
		    	  countPerGroup.put(groupBy, counts);
		      }
		      System.out.println("End Iterating for top" );
		      return true;
		    }
		
		
		public Collection<ArrayList<Count>> terminatePartial() {
			return countPerGroup.values();
		}
		
		public boolean merge(HashMap<Integer, ArrayList<Count>> merge) {
			//this will get complex
			System.out.println("Mergoing");
			if ((countPerGroup == null) || (countPerGroup.size() == 0)) {
				countPerGroup = merge;
			}
			else {
				//iterate through countPerGroup, get the arrayList, merge them.....
				Iterator<Integer> iter = merge.keySet().iterator();
				while (iter.hasNext()) {
					Integer mergeKey = iter.next();
					ArrayList<Count> fromMerge = merge.get(mergeKey);
					ArrayList<Count> fromThis = countPerGroup.get(mergeKey);
					if ((fromThis == null) || (fromThis.size() == 0)) {
						countPerGroup.put(mergeKey, fromMerge);
					}
					else {
						countPerGroup.put(mergeKey, merge(fromMerge, fromThis, max));						
					}
				}//while
			}
			return true;
		}
		
		private ArrayList<Count> merge(ArrayList<Count> from, ArrayList<Count> to, int max) {
			to.addAll(from);
			Collections.sort(to);
			return (ArrayList<Count>) to.subList(0, max -1);
		}
		
		public Collection<ArrayList<Count>> terminate() {
			return countPerGroup.values();
		}
		
	}//class	
}

Reply via email to