Hi,
I am writing a UDAF which returns the top x results per key. Lets say my
input is
key attribute count
1 1 6
1 2 5
1 3 4
2 1 8
2 2 4
2 3 1
I want the top 2 results per key. Which will be:
key attribute count
1 1 6
1 2 5
2 1 8
2 2 4
I have written a UDAF for this in the attached file. However, when I run the
code, I get the exception:
FAILED: Unknown exception :
org.apache.hadoop.hive.serde2.objectinspector.primitive.JavaIntObjectInspector
cannot be cast to
org.apache.hadoop.hive.serde2.objectinspector.primitive.SettableIntObjectInspector
Can anyone please let me know what I could be doing wrong?
Thanks and Regards,
Sonal
package org.apache.hadoop.hive.udaf;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public class TopXPerGroup extends UDAF{
//holds count per key
public static class Count implements Comparable{
private Integer key;
private Integer attribute;
private Integer count;
public Count(Integer key, Integer attribute, Integer count) {
System.out.println("Creating count with " + key + " " + attribute + " " +count);
this.key = key;
this.count = count;
this.attribute = attribute;
}
public Integer getKey() {
return key;
}
public void setKey(Integer key) {
this.key = key;
}
public Integer getCount() {
return count;
}
public void setCount(Integer count) {
this.count = count;
}
public Integer getAttribute() {
return attribute;
}
public void setAttribute(Integer attribute) {
this.attribute = attribute;
}
@Override
public int compareTo(Object to) {
System.out.println("Comparing with " + to);
if ((to == null) || (to.getClass() != getClass())) {
return 1;
}
Count compare = (Count) to;
if (compare.count == count) return 0;
if (compare.count > count) return -1;
return 1;
}
public String toString() {
return key + "," + attribute + "," + count;
}
}
private HashMap<Integer, ArrayList<Count>> countPerGroup;
public class TopXPerGroupIntEvaluator implements UDAFEvaluator {
private Integer max;
public TopXPerGroupIntEvaluator() {
super();
init();
}
public void init() {
countPerGroup = new HashMap<Integer, ArrayList<Count>>();
}
public boolean iterate(Integer max, Integer groupBy, Integer attribute, Integer count) {
System.out.println("Iterating for top" );
this.max = max;
ArrayList<Count> counts = countPerGroup.get(groupBy);
if (counts == null) {
counts = new ArrayList<Count>();
}
if (counts.size() < max) {
Count counter = new Count(groupBy, attribute, count);
counts.add(counter);
countPerGroup.put(groupBy, counts);
}
System.out.println("End Iterating for top" );
return true;
}
public Collection<ArrayList<Count>> terminatePartial() {
return countPerGroup.values();
}
public boolean merge(HashMap<Integer, ArrayList<Count>> merge) {
//this will get complex
System.out.println("Mergoing");
if ((countPerGroup == null) || (countPerGroup.size() == 0)) {
countPerGroup = merge;
}
else {
//iterate through countPerGroup, get the arrayList, merge them.....
Iterator<Integer> iter = merge.keySet().iterator();
while (iter.hasNext()) {
Integer mergeKey = iter.next();
ArrayList<Count> fromMerge = merge.get(mergeKey);
ArrayList<Count> fromThis = countPerGroup.get(mergeKey);
if ((fromThis == null) || (fromThis.size() == 0)) {
countPerGroup.put(mergeKey, fromMerge);
}
else {
countPerGroup.put(mergeKey, merge(fromMerge, fromThis, max));
}
}//while
}
return true;
}
private ArrayList<Count> merge(ArrayList<Count> from, ArrayList<Count> to, int max) {
to.addAll(from);
Collections.sort(to);
return (ArrayList<Count>) to.subList(0, max -1);
}
public Collection<ArrayList<Count>> terminate() {
return countPerGroup.values();
}
}//class
}