Your program looks innocuous enough. Does attaching w/ jmap and getting a '-histo' dump tell you anything? Have you tried upping your JVM heap size (HADOOP_OPTS)? There's a lot going if you have the FS, MR, and HBase all up and running in the one JVM.

St.Ack


Holger Stenzhorn wrote:
Hi,

I started to use Hadoop MapReduce together with HBase and experience an exception when running a test case (using the 0.15 first and then latest SVN). My usage scenario is the following: I have a text file containing RDF triples (of the form subject-relation-object) with one triple on each line (e.g. "<http://dblp.l3s.de/d2r/resource/publications/books/acm/kim95/AnnevelinkACFHK95> <http://purl.org/dc/elements/1.1/creator> <http://dblp.l3s.de/d2r/resource/authors/Jurgen_Annevelink>"). A given subject can appear several times with different a relation-object combination. Now I want to go through the file, put into the table the subject as row key and all found triples for this key into the same row (i.e. (abbreviated) "triple:0 - "<A> <x> <B>", "<C> <y> <D>", etc.).

So I implemented the class attached below for this purpose and (for testing purposes) ran it locally: It works fine until 44848 lines in my current test file. If I add more lines (i.e. triples) then the whole thing crashes with the following exception. Now are there ways around of this (except for splitting up files in smaller files)? ...or is this actually a bug in Hadoop?

07/11/06 22:44:21 INFO jvm.JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId= 07/11/06 22:44:21 WARN mapred.JobClient: No job jar file set. User classes may not be found. See JobConf(Class) or JobConf#setJar(String). 07/11/06 22:44:21 INFO mapred.FileInputFormat: Total input paths to process : 1
07/11/06 22:44:21 INFO mapred.JobClient: Running job: job_local_1
07/11/06 22:44:21 INFO mapred.MapTask: numReduceTasks: 1
07/11/06 22:44:22 INFO mapred.JobClient:  map 0% reduce 0%
07/11/06 22:44:23 WARN mapred.LocalJobRunner: job_local_1
java.lang.OutOfMemoryError: Java heap space
       at java.util.Arrays.copyOf(Arrays.java:2786)
at java.io.ByteArrayOutputStream.write(ByteArrayOutputStream.java:94)
       at java.io.DataOutputStream.write(DataOutputStream.java:90)
       at org.apache.hadoop.io.Text.write(Text.java:243)
at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.collect(MapTask.java:347)
       at TriplesTest$TriplesTestMapper.map(TriplesTest.java:41)
       at TriplesTest$TriplesTestMapper.map(TriplesTest.java:32)
       at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:192)
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:132)
Exception in thread "main" java.io.IOException: Job failed!
       at org.apache.hadoop.mapred.JobClient.runJob(JobClient.java:831)
       at TriplesTest.run(TriplesTest.java:106)
       at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:65)
       at TriplesTest.main(TriplesTest.java:112)

Cheers,
Holger

TriplesTest.java:
-----------------

import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class TriplesTest extends Configured implements Tool {

 public static class TriplesTriplesTestMapper extends MapReduceBase
   implements Mapper<LongWritable, Text, Text, Text> {
         public void map(LongWritable key, Text value,
                   OutputCollector<Text, Text> output,
                   Reporter reporter) throws IOException {
     String[] temp = value.toString().split(">\\s+<");
     if (temp.length == 3) {
       output.collect(new Text(temp[0] + ">"), value);
     }
   }
 }

 public static class TriplesTestReducer extends MapReduceBase
   implements Reducer<Text, Text, Text, MapWritable> {
     public void reduce(Text key, Iterator<Text> values,
                      OutputCollector<Text, MapWritable> output,
                      Reporter reporter) throws IOException {
     int i = 0;
     while (values.hasNext()) {
       byte[] bytes = values.next().getBytes();
       MapWritable newValue = new MapWritable();
       newValue.put(new Text("triple:" + i++),
         new ImmutableBytesWritable(bytes));
       output.collect(key, newValue);
     }
   }
 }

public int run(String[] args) throws Exception { JobConf jobConf = new JobConf(getConf(), TriplesTest.class);
             jobConf.setJobName("triples");

   jobConf.setMapperClass(TriplesTriplesTestMapper.class);
   jobConf.setReducerClass(TriplesTriplesTestReducer.class);

   jobConf.setInputPath(new Path("c:/development/test/input"));
     jobConf.setInputFormat(TextInputFormat.class);
   jobConf.setOutputFormat(TableOutputFormat.class);

   jobConf.setOutputKeyClass(Text.class);
     jobConf.set("hbase.mapred.outputtable", "triples");
   jobConf.set("hbase.master", "local");
     JobClient.runJob(jobConf);
     return 0;
 }

 public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new TriplesTest(), args);
   System.exit(res);
 }

}

Reply via email to