Thanks - I switched to using the mapreduce version of dboutputformat and
things look a little better but I am getting a ClassCastException ..
here is my writable class
public class LogRecord implements Writable, DBWritable {
private long timestamp;
private String userId;
private String action;
public LogRecord() {
}
public LogRecord(long timestamp, String userId, String action,
String pageType, String pageName, String attrPath, String
attrName,
String forEntity, String forEntityInfo, long rendTime) {
this.timestamp = timestamp;
this.userId = userId;
this.action = action;
}
public void clearFields(){
this.timestamp = 0;
this.userId = "";
this.action = "";
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((action == null) ? 0 :
action.hashCode());
result = prime * result + (int) (timestamp ^ (timestamp >>> 32));
result = prime * result + ((userId == null) ? 0 :
userId.hashCode());
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
LogRecord other = (LogRecord) obj;
if (action == null) {
if (other.action != null)
return false;
} else if (!action.equals(other.action))
return false;
if (timestamp != other.timestamp)
return false;
if (userId == null) {
if (other.userId != null)
return false;
} else if (!userId.equals(other.userId))
return false;
return true;
}
@Override
public void readFields(DataInput in) throws IOException {
this.timestamp = in.readLong();
this.userId = Text.readString(in);
this.action = Text.readString(in);
}
@Override
public void write(DataOutput out) throws IOException {
out.writeLong(this.timestamp);
Text.writeString(out, this.userId);
Text.writeString(out, this.action);
}
@Override
public void readFields(ResultSet rs) throws SQLException {
this.timestamp = rs.getLong(1);
this.userId = rs.getString(2);
this.action = rs.getString(3);
}
@Override
public void write(PreparedStatement stmt) throws SQLException {
stmt.setLong(1, this.timestamp);
stmt.setString(2, this.userId);
stmt.setString(3, this.action);
}
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
public void setUserId(String userId) {
this.userId = userId;
}
public void setAction(String action) {
this.action = action;
}
}
**************************************
here is my job runner/configuration code
//configuration
Configuration conf = new Configuration();
Job job = new Job(conf, "Log Parser Job");
//configure database output
job.setOutputFormatClass(DBOutputFormat.class);
DBConfiguration.configureDB(conf,
"com.microsoft.sqlserver.jdbc.SQLServerDriver",
"jdbc:sqlserver://..........",
"...", "...");
String[] fields = {"timestamp", "userId", "action"};
DBOutputFormat.setOutput(job, "LogParser", fields);
//job properties
job.setJarByClass(Driver.class);
job.setMapperClass(LogParserMapper.class);
job.setReducerClass(LogParserReducer.class);
job.setMapOutputKeyClass(LogRecord.class);
job.setMapOutputValueClass(IntWritable.class);
job.setOutputKeyClass(LogRecord.class);
job.setOutputValueClass(NullWritable.class);
*************
mapper code:
public class LogParserMapper extends Mapper<LongWritable, Text, LogRecord,
IntWritable> {
private LogRecord rec = new LogRecord();
private final static IntWritable _val = new IntWritable(1);
public void map(LongWritable key, Text value, Context context){
String line = value.toString();
//parse the line into tokens
...
rec.setUserId(userId);
rec.setAction("test");
rec.setTimestamp(0);
}
}
******************
reducer:
public class LogParserReducer extends Reducer<LogRecord, IntWritable,
LogRecord, NullWritable> {
private NullWritable n = NullWritable.get();
public void reduce(LogRecord key, Iterable<IntWritable> values, Context
context) throws IOException, InterruptedException {
context.write(key, n);
}
}
******************
finally when i run it I am getting this error message
11/02/04 13:47:55 INFO mapred.JobClient: Task Id :
attempt_201101241250_0094_m_000000_1, Status : FAILED
java.lang.ClassCastException: class logparser.model.LogRecord
at java.lang.Class.asSubclass(Class.java:3018)
at
org.apache.hadoop.mapred.JobConf.getOutputKeyComparator(JobConf.java:796)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.<init>(MapTask.java:809)
at
org.apache.hadoop.mapred.MapTask$NewOutputCollector.<init>(MapTask.java:549)
at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:631)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:315)
at org.apache.hadoop.mapred.Child$4.run(Child.java:217)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:396)
at
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1063)
at org.apache.hadoop.mapred.Child.main(Child.java:211)
my hadoop version is 0.20.2 so I am not sure why its using the mapred stuff
while running it and if thats the problem.
Thanks for your help
On Thu, Feb 3, 2011 at 9:29 PM, Ted Yu <[email protected]> wrote:
> At least in cdh3b2, there are two DBOutputFormat.java:
>
> ./src/mapred/org/apache/hadoop/mapred/lib/db/DBOutputFormat.java
> ./src/mapred/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java
>
> You should be able to use the latter.
>
> On Thu, Feb 3, 2011 at 2:45 PM, Adeel Qureshi <[email protected]
> >wrote:
>
> > I had started a thread recently to ask questions about custom writable
> > implementations which is basically similar to this .. but that was more
> of
> > an understanding of the concept and here I wanted to ask my actual
> problem
> > and get help on that.
> >
> > I want to be able to read text data line by line in my mapper ..
> > create an instance of a custom writable class that holds some information
> > parsed out of the line ..
> > pass that custom writable along with its count to reducer
> > reducer then simply need to insert every single entry into a database ..
> >
> > I am just trying to understand how to accomplish this. here is what I am
> > thinking i need to do based on my little understanding of all this custom
> > stuff
> >
> > 1. create a custom writable class that can hold my parsed records. in my
> > mapper create a new instance of it using the text line and output the
> > created instance
> > 2. accept this custom writable in mapper
> > 3. set reducer output to DBOutputFormat
> > I tried doing that and it seems like I am supposed to use JobConf
> class
> > which is deprecated and the new configuration class where you are
> supposed
> > to use the job object to set the input/output formats doesnt seems to
> work
> > with DBOuputFormat .. doesnt this DBOutputFormat stuff works with hadoop
> > new
> > api
> >
> > 4. now in reducer I am confused wat to do .. i guess i need to convert my
> > custom writable object to another custom dbwritable object .. that will
> > then
> > be written to the database .. any hints on how to accomplish this ..
> >
> > Sorry if the questions arent very clear .. I am just really confused
> about
> > this stuff and it doesnt helps that there is literally NO useful
> > information
> > available anywhere on this writable and dbwritable stuff
> >
> > Thanks
> > Adeel
> >
>