Your program looks innocuous enough. Does attaching w/ jmap and
getting a '-histo' dump tell you anything? Have you tried upping your
JVM heap size (HADOOP_OPTS)? There's a lot going if you have the FS,
MR, and HBase all up and running in the one JVM.
St.Ack
Holger Stenzhorn wrote:
Hi,
I started to use Hadoop MapReduce together with HBase and experience
an exception when running a test case (using the 0.15 first and then
latest SVN).
My usage scenario is the following: I have a text file containing RDF
triples (of the form subject-relation-object) with one triple on each
line (e.g.
"<http://dblp.l3s.de/d2r/resource/publications/books/acm/kim95/AnnevelinkACFHK95>
<http://purl.org/dc/elements/1.1/creator>
<http://dblp.l3s.de/d2r/resource/authors/Jurgen_Annevelink>"). A given
subject can appear several times with different a relation-object
combination.
Now I want to go through the file, put into the table the subject as
row key and all found triples for this key into the same row (i.e.
(abbreviated) "triple:0 - "<A> <x> <B>", "<C> <y> <D>", etc.).
So I implemented the class attached below for this purpose and (for
testing purposes) ran it locally: It works fine until 44848 lines in
my current test file. If I add more lines (i.e. triples) then the
whole thing crashes with the following exception. Now are there ways
around of this (except for splitting up files in smaller files)? ...or
is this actually a bug in Hadoop?
07/11/06 22:44:21 INFO jvm.JvmMetrics: Initializing JVM Metrics with
processName=JobTracker, sessionId=
07/11/06 22:44:21 WARN mapred.JobClient: No job jar file set. User
classes may not be found. See JobConf(Class) or JobConf#setJar(String).
07/11/06 22:44:21 INFO mapred.FileInputFormat: Total input paths to
process : 1
07/11/06 22:44:21 INFO mapred.JobClient: Running job: job_local_1
07/11/06 22:44:21 INFO mapred.MapTask: numReduceTasks: 1
07/11/06 22:44:22 INFO mapred.JobClient: map 0% reduce 0%
07/11/06 22:44:23 WARN mapred.LocalJobRunner: job_local_1
java.lang.OutOfMemoryError: Java heap space
at java.util.Arrays.copyOf(Arrays.java:2786)
at
java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
at java.io.DataOutputStream.write(DataOutputStream.java:90)
at org.apache.hadoop.io.Text.write(Text.java:243)
at
org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:347)
at TriplesTest$TriplesTestMapper.map(TriplesTest.java:41)
at TriplesTest$TriplesTestMapper.map(TriplesTest.java:32)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:192)
at
org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:132)
Exception in thread "main" java.io.IOException: Job failed!
at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:831)
at TriplesTest.run(TriplesTest.java:106)
at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
at TriplesTest.main(TriplesTest.java:112)
Cheers,
Holger
TriplesTest.java:
-----------------
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class TriplesTest extends Configured implements Tool {
public static class TriplesTriplesTestMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value,
OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
String[] temp = value.toString().split(">\\s+<");
if (temp.length == 3) {
output.collect(new Text(temp[0] + ">"), value);
}
}
}
public static class TriplesTestReducer extends MapReduceBase
implements Reducer<Text, Text, Text, MapWritable> {
public void reduce(Text key, Iterator<Text> values,
OutputCollector<Text, MapWritable> output,
Reporter reporter) throws IOException {
int i = 0;
while (values.hasNext()) {
byte[] bytes = values.next().getBytes();
MapWritable newValue = new MapWritable();
newValue.put(new Text("triple:" + i++),
new ImmutableBytesWritable(bytes));
output.collect(key, newValue);
}
}
}
public int run(String[] args) throws Exception { JobConf jobConf
= new JobConf(getConf(), TriplesTest.class);
jobConf.setJobName("triples");
jobConf.setMapperClass(TriplesTriplesTestMapper.class);
jobConf.setReducerClass(TriplesTriplesTestReducer.class);
jobConf.setInputPath(new Path("c:/development/test/input"));
jobConf.setInputFormat(TextInputFormat.class);
jobConf.setOutputFormat(TableOutputFormat.class);
jobConf.setOutputKeyClass(Text.class);
jobConf.set("hbase.mapred.outputtable", "triples");
jobConf.set("hbase.master", "local");
JobClient.runJob(jobConf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TriplesTest(),
args);
System.exit(res);
}
}