On Fri, Apr 2, 2010 at 9:34 PM, Zheng Shao <[email protected]> wrote:
> The easiest way is to write a SequenceFileInputFormat that returns a
> RecordReader that has key in the value and value in the key.
>
> Zheng
>
> On Fri, Apr 2, 2010 at 2:16 PM, Edward Capriolo <[email protected]>
> wrote:
> > I have some sequence files in which all our data is in the key.
> >
> > http://osdir.com/ml/hive-user-hadoop-apache/2009-10/msg00027.html
> >
> > Has anyone tackled the above issue?
> >
> >
>
>
>
> --
> Yours,
> Zheng
>
I am attempting to do this for sequence files. Unfortunately I have to copy
much of the SequenceFile format since the reader (in) has private access.
----------------------------------------
public class SequenceKeyOnlyInputFormat<K extends WritableComparable, V
extends Writable> extends SequenceFileInputFormat<K, V> {
public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new SequenceKeyOnlyRecordReader<K, V>(job, (FileSplit)
split);
}
}
--------------------------------------------
@SuppressWarnings({ "unchecked", "deprecation" })
public class SequenceKeyOnlyRecordReader<K extends WritableComparable , V
extends Writable>
implements RecordReader<K, V>{
private SequenceFile.Reader in;
private long start;
private long end;
private boolean more = true;
protected Configuration conf;
public SequenceKeyOnlyRecordReader(Configuration conf, FileSplit split)
throws IOException {
Path path = split.getPath();
FileSystem fs = path.getFileSystem(conf);
this.in = new SequenceFile.Reader(fs, path, conf);
this.end = split.getStart() + split.getLength();
this.conf = conf;
if (split.getStart() > in.getPosition()) in.sync(split.getStart());
// sync to start
this.start = in.getPosition();
more = start < end;
}
/**
* The class of key that must be passed to {...@link #next(Object,
Object)}..
*/
public Class getKeyClass() {
return in.getKeyClass();
}
/**
* The class of value that must be passed to {...@link #next(Object,
Object)}..
*/
public Class getValueClass() {
return in.getKeyClass();
}
public K createKey() {
return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
}
public V createValue() {
return (V) ReflectionUtils.newInstance(getKeyClass(), conf);
}
public synchronized boolean next(K key, V value) throws IOException {
if (!more) return false;
long pos = in.getPosition();
boolean remaining = in.next(key);
if (remaining) {
getCurrentValue(value);
}
if (pos >= end && in.syncSeen()) {
more = false;
} else {
more = remaining;
}
return more;
}
protected synchronized boolean next(K key) throws IOException {
if (!more) return false;
long pos = in.getPosition();
boolean remaining = in.next(key);
if (pos >= end && in.syncSeen()) {
more = false;
} else {
more = remaining;
}
return more;
}
protected synchronized void getCurrentValue(V value) throws IOException
{
in.getCurrentValue(value);
//in.next(value);
}
/**
* Return the progress within the input split
*
* @return 0.0 to 1.0 of the input byte range
*/
public float getProgress() throws IOException {
if (end == start) {
return 0.0f;
} else {
return Math.min(1.0f, (in.getPosition() - start) / (float) (end
- start));
}
}
public synchronized long getPos() throws IOException {
return in.getPosition();
}
protected synchronized void seek(long pos) throws IOException {
in.seek(pos);
}
public synchronized void close() throws IOException {
in.close();
}
}
seems like:
protected synchronized void getCurrentValue(V value) throws IOException
{
in.getCurrentValue(value);
}
^ Returns nulls
protected synchronized void getCurrentValue(V value) throws IOException
{
in.next(value);
}
^ returns every other row.
Do you have any idea what I am doing wrong? Will contrib it hopefully If i
can get this going correctly.
Thanks,
Edward