Guys,

I have a SequenceFile with LogWritable Keys and Text as values . I am using SequenceFileSource with MRPipeline. But when I use MemPipeline it is giving back the following exception.

3503 [main] INFO  com.cloudera.crunch.io.seq.SeqFileReaderFactory  - Error 
reading from path: file:/home/rahul/software/crunch/sampleFile
java.io.IOException: wrong key class: org.apache.hadoop.io.ObjectWritable is 
not class org.apache.hadoop.io.LongWritable
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1895)
    at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1947)
    at 
com.cloudera.crunch.io.seq.SeqFileReaderFactory$1.hasNext(SeqFileReaderFactory.java:68)
    at 
com.cloudera.crunch.io.CompositePathIterable$2.hasNext(CompositePathIterable.java:81)

Now this is due to the fact that the file contains LongWritable Keys but it is using a NullWritable to read them. This gives error in MemPipline only, it works in the MRPipeline because the KeyClass is passed there using the MapContext of Hadoop and thus it is the correct one. I modified the SeqFileReaderFactory to pass the KeyClass also but is this the correct way of doing so ?

regards
Rahul
package com.mylearning.crunch;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;

import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mem.MemPipeline;
import com.cloudera.crunch.io.seq.SeqFileSource;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.types.writable.WritableTypeFamily;
import com.cloudera.crunch.types.writable.Writables;
import com.mylearning.crunch.FirstTest.WordCounter;

public class SequenceTest {

  @Test
  public void testSequenceFile() throws Exception {
    LineIterator lineIterator = FileUtils.lineIterator(new 
File("/home/rahul/Downloads/bible.txt"));

    Path pathName = new Path("sampleFile");
    SequenceWriter writer = new SequenceWriter(new Configuration(), pathName);
    while (lineIterator.hasNext()) {
      String data = (String) lineIterator.next();
      writer.write(data);
    }
    writer.close();
    System.out.println(pathName);

    Pipeline pipeline = MemPipeline.getInstance();
    // Pipeline pipeline = new MRPipeline(SequenceTest.class);
    PCollection<String> textFile = pipeline.read(new SeqFileSource<String>(new 
Path("sampleFile"),
        Writables.strings()));
    Iterator<String> iterator = textFile.materialize().iterator();
    while (iterator.hasNext()) {
      String string = (String) iterator.next();
      System.out.println(string);
    }
    PCollection<Integer> lineWordCount = textFile.parallelDo("wordCount", new 
WordCounter(),
        WritableTypeFamily.getInstance().ints());
    PCollection<Integer> maxValue = Aggregate.max(lineWordCount);
    pipeline.writeTextFile(maxValue, "/home/rahul/crunchOut");
    pipeline.done();

  }

}

class SequenceWriter implements Closeable {
  private SequenceFile.Writer writer;

  public SequenceWriter(Configuration conf, Path pathName) {
    try {
      writer = SequenceFile.createWriter(FileSystem.get(conf), conf, pathName, 
LongWritable.class,
          Text.class);
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  void write(String string) {
    try {
      writer.append(new LongWritable(0), new Text(string));
    } catch (IOException e) {
      throw new RuntimeException(e);
    }
  }

  @Override
  public void close() throws IOException {
    writer.close();
  }

}

Reply via email to