Johannes Rußek
Thu, 18 Mar 2010 13:29:04 -0700
Hello Dmitriy,so i've implemented the wrapper using a public static LookupService to store the GeoIP object. Is this what you though of when you said "static service"? Attached you can find the UDF, it's very simple so far and only resolved to the country.
I'm not sure if I'm handling the errors correctly so far.I was able to build and run it with pig 0.6.0 on a few sample datasets and it did work. i'm running it on 40G of input data now, let's see.
Thanks and regards, Johannes
package myudf; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.TimeZone;
import org.apache.pig.EvalFunc;
import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.util.WrappedIOException;
import com.maxmind.geoip.*;
/**
*/
public class IP2Country extends EvalFunc<String> {
public static LookupService iplookupService;
private static String DEFAULT_LOCATION = "/var/lib/GeoIP/GeoIP.dat";
public IP2Country() {
this(DEFAULT_LOCATION);
}
public IP2Country(String GeoIPFILE) {
if(iplookupService == null) {
try{
iplookupService = new
LookupService(GeoIPFILE,LookupService.GEOIP_MEMORY_CACHE |
LookupService.GEOIP_CHECK_CACHE);
} catch (IOException ie) {
System.err.println("myudf.IP2Country was unable to open " +
GeoIPFILE);
}
}
}
@Override
public String exec(Tuple input) throws IOException {
if (input == null || input.size() == 0)
return null;
String str="";
try{
str = (String)input.get(0);
String geo = iplookupService.getCountry(str).getCode();
return geo;
} catch (IOException ie) {
System.err.println("myudf.IP2Country was unable to parse the ip " +
str);
return null;
} catch(Exception e){
throw WrappedIOException.wrap("Caught exception processing input row ",
e);
}
}
@Override
public List<FuncSpec> getArgToFuncMapping() throws FrontendException {
List<FuncSpec> funcList = new ArrayList<FuncSpec>();
funcList.add(new FuncSpec(this.getClass().getName(),
new Schema(new Schema.FieldSchema(null, DataType.CHARARRAY))));
return funcList;
}
}