Guys,
I have a SequenceFile with LogWritable Keys and Text as values . I am
using SequenceFileSource with MRPipeline. But when I use MemPipeline it
is giving back the following exception.
3503 [main] INFO com.cloudera.crunch.io.seq.SeqFileReaderFactory - Error
reading from path: file:/home/rahul/software/crunch/sampleFile
java.io.IOException: wrong key class: org.apache.hadoop.io.ObjectWritable is
not class org.apache.hadoop.io.LongWritable
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1895)
at org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1947)
at
com.cloudera.crunch.io.seq.SeqFileReaderFactory$1.hasNext(SeqFileReaderFactory.java:68)
at
com.cloudera.crunch.io.CompositePathIterable$2.hasNext(CompositePathIterable.java:81)
Now this is due to the fact that the file contains LongWritable Keys but
it is using a NullWritable to read them. This gives error in MemPipline
only, it works in the MRPipeline because the KeyClass is passed there
using the MapContext of Hadoop and thus it is the correct one. I
modified the SeqFileReaderFactory to pass the KeyClass also but is this
the correct way of doing so ?
regards
Rahul
package com.mylearning.crunch;
import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.LineIterator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.junit.Test;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.impl.mem.MemPipeline;
import com.cloudera.crunch.io.seq.SeqFileSource;
import com.cloudera.crunch.lib.Aggregate;
import com.cloudera.crunch.types.writable.WritableTypeFamily;
import com.cloudera.crunch.types.writable.Writables;
import com.mylearning.crunch.FirstTest.WordCounter;
public class SequenceTest {
@Test
public void testSequenceFile() throws Exception {
LineIterator lineIterator = FileUtils.lineIterator(new
File("/home/rahul/Downloads/bible.txt"));
Path pathName = new Path("sampleFile");
SequenceWriter writer = new SequenceWriter(new Configuration(), pathName);
while (lineIterator.hasNext()) {
String data = (String) lineIterator.next();
writer.write(data);
}
writer.close();
System.out.println(pathName);
Pipeline pipeline = MemPipeline.getInstance();
// Pipeline pipeline = new MRPipeline(SequenceTest.class);
PCollection<String> textFile = pipeline.read(new SeqFileSource<String>(new
Path("sampleFile"),
Writables.strings()));
Iterator<String> iterator = textFile.materialize().iterator();
while (iterator.hasNext()) {
String string = (String) iterator.next();
System.out.println(string);
}
PCollection<Integer> lineWordCount = textFile.parallelDo("wordCount", new
WordCounter(),
WritableTypeFamily.getInstance().ints());
PCollection<Integer> maxValue = Aggregate.max(lineWordCount);
pipeline.writeTextFile(maxValue, "/home/rahul/crunchOut");
pipeline.done();
}
}
class SequenceWriter implements Closeable {
private SequenceFile.Writer writer;
public SequenceWriter(Configuration conf, Path pathName) {
try {
writer = SequenceFile.createWriter(FileSystem.get(conf), conf, pathName,
LongWritable.class,
Text.class);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
void write(String string) {
try {
writer.append(new LongWritable(0), new Text(string));
} catch (IOException e) {
throw new RuntimeException(e);
}
}
@Override
public void close() throws IOException {
writer.close();
}
}