On Fri, Apr 2, 2010 at 9:34 PM, Zheng Shao <[email protected]> wrote:

> The easiest way is to write a SequenceFileInputFormat that returns a
> RecordReader that has key in the value and value in the key.
>
> Zheng
>
> On Fri, Apr 2, 2010 at 2:16 PM, Edward Capriolo <[email protected]>
> wrote:
> > I have some sequence files in which all our data is in the key.
> >
> > http://osdir.com/ml/hive-user-hadoop-apache/2009-10/msg00027.html
> >
> > Has anyone tackled the above issue?
> >
> >
>
>
>
> --
> Yours,
> Zheng
>


I am attempting to do this for sequence files. Unfortunately I have to copy
much of the SequenceFile format since the reader (in) has private access.
----------------------------------------
public class SequenceKeyOnlyInputFormat<K extends WritableComparable, V
extends Writable> extends SequenceFileInputFormat<K, V> {

    public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
Reporter reporter) throws IOException {
        reporter.setStatus(split.toString());
        return new SequenceKeyOnlyRecordReader<K, V>(job, (FileSplit)
split);
    }

}
--------------------------------------------
@SuppressWarnings({ "unchecked", "deprecation" })
public class SequenceKeyOnlyRecordReader<K extends WritableComparable , V
extends Writable>
implements RecordReader<K, V>{

    private SequenceFile.Reader in;
    private long start;
    private long end;
    private boolean more = true;
    protected Configuration conf;


    public SequenceKeyOnlyRecordReader(Configuration conf, FileSplit split)
throws IOException {
        Path path = split.getPath();
        FileSystem fs = path.getFileSystem(conf);
        this.in = new SequenceFile.Reader(fs, path, conf);
        this.end = split.getStart() + split.getLength();
        this.conf = conf;

        if (split.getStart() > in.getPosition()) in.sync(split.getStart());
// sync to start

        this.start = in.getPosition();
        more = start < end;
    }

    /**
     * The class of key that must be passed to {...@link #next(Object,
Object)}..
     */
    public Class getKeyClass() {
        return in.getKeyClass();
    }

    /**
     * The class of value that must be passed to {...@link #next(Object,
Object)}..
     */
    public Class getValueClass() {
        return in.getKeyClass();
    }

    public K createKey() {
        return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
    }

    public V createValue() {
        return (V) ReflectionUtils.newInstance(getKeyClass(), conf);
    }

    public synchronized boolean next(K key, V value) throws IOException {
        if (!more) return false;
        long pos = in.getPosition();

        boolean remaining = in.next(key);
        if (remaining) {
            getCurrentValue(value);
        }
        if (pos >= end && in.syncSeen()) {
            more = false;
        } else {
            more = remaining;
        }
        return more;
    }

    protected synchronized boolean next(K key) throws IOException {
        if (!more) return false;
        long pos = in.getPosition();
        boolean remaining = in.next(key);
        if (pos >= end && in.syncSeen()) {
            more = false;
        } else {
            more = remaining;
        }
        return more;
    }

    protected synchronized void getCurrentValue(V value) throws IOException
{
         in.getCurrentValue(value);
        //in.next(value);
    }

    /**
     * Return the progress within the input split
     *
     * @return 0.0 to 1.0 of the input byte range
     */
    public float getProgress() throws IOException {
        if (end == start) {
            return 0.0f;
        } else {
            return Math.min(1.0f, (in.getPosition() - start) / (float) (end
- start));
        }
    }

    public synchronized long getPos() throws IOException {
        return in.getPosition();
    }

    protected synchronized void seek(long pos) throws IOException {
        in.seek(pos);
    }

    public synchronized void close() throws IOException {
        in.close();
    }

}

seems like:

    protected synchronized void getCurrentValue(V value) throws IOException
{
         in.getCurrentValue(value);
    }

^ Returns nulls

    protected synchronized void getCurrentValue(V value) throws IOException
{
       in.next(value);
    }

^ returns every other row.

Do you have any idea what I am doing wrong? Will contrib it hopefully If i
can get this going correctly.

Thanks,
Edward

Reply via email to