Try this:

protected synchronized void getCurrentValue(V value) throws IOException
 {
          in.getCurrentKey);
 }


On Tue, Apr 13, 2010 at 4:42 PM, Edward Capriolo <[email protected]> wrote:
>
>
> On Fri, Apr 2, 2010 at 9:34 PM, Zheng Shao <[email protected]> wrote:
>>
>> The easiest way is to write a SequenceFileInputFormat that returns a
>> RecordReader that has key in the value and value in the key.
>>
>> Zheng
>>
>> On Fri, Apr 2, 2010 at 2:16 PM, Edward Capriolo <[email protected]>
>> wrote:
>> > I have some sequence files in which all our data is in the key.
>> >
>> > http://osdir.com/ml/hive-user-hadoop-apache/2009-10/msg00027.html
>> >
>> > Has anyone tackled the above issue?
>> >
>> >
>>
>>
>>
>> --
>> Yours,
>> Zheng
>
>
> I am attempting to do this for sequence files. Unfortunately I have to copy
> much of the SequenceFile format since the reader (in) has private access.
> ----------------------------------------
> public class SequenceKeyOnlyInputFormat<K extends WritableComparable, V
> extends Writable> extends SequenceFileInputFormat<K, V> {
>
>     public RecordReader<K, V> getRecordReader(InputSplit split, JobConf job,
> Reporter reporter) throws IOException {
>         reporter.setStatus(split.toString());
>         return new SequenceKeyOnlyRecordReader<K, V>(job, (FileSplit)
> split);
>     }
>
> }
> --------------------------------------------
> @SuppressWarnings({ "unchecked", "deprecation" })
> public class SequenceKeyOnlyRecordReader<K extends WritableComparable , V
> extends Writable>
> implements RecordReader<K, V>{
>
>     private SequenceFile.Reader in;
>     private long start;
>     private long end;
>     private boolean more = true;
>     protected Configuration conf;
>
>
>     public SequenceKeyOnlyRecordReader(Configuration conf, FileSplit split)
> throws IOException {
>         Path path = split.getPath();
>         FileSystem fs = path.getFileSystem(conf);
>         this.in = new SequenceFile.Reader(fs, path, conf);
>         this.end = split.getStart() + split.getLength();
>         this.conf = conf;
>
>         if (split.getStart() > in.getPosition()) in.sync(split.getStart());
> // sync to start
>
>         this.start = in.getPosition();
>         more = start < end;
>     }
>
>     /**
>      * The class of key that must be passed to {...@link #next(Object,
> Object)}..
>      */
>     public Class getKeyClass() {
>         return in.getKeyClass();
>     }
>
>     /**
>      * The class of value that must be passed to {...@link #next(Object,
> Object)}..
>      */
>     public Class getValueClass() {
>         return in.getKeyClass();
>     }
>
>     public K createKey() {
>         return (K) ReflectionUtils.newInstance(getKeyClass(), conf);
>     }
>
>     public V createValue() {
>         return (V) ReflectionUtils.newInstance(getKeyClass(), conf);
>     }
>
>     public synchronized boolean next(K key, V value) throws IOException {
>         if (!more) return false;
>         long pos = in.getPosition();
>
>         boolean remaining = in.next(key);
>         if (remaining) {
>             getCurrentValue(value);
>         }
>         if (pos >= end && in.syncSeen()) {
>             more = false;
>         } else {
>             more = remaining;
>         }
>         return more;
>     }
>
>     protected synchronized boolean next(K key) throws IOException {
>         if (!more) return false;
>         long pos = in.getPosition();
>         boolean remaining = in.next(key);
>         if (pos >= end && in.syncSeen()) {
>             more = false;
>         } else {
>             more = remaining;
>         }
>         return more;
>     }
>
>     protected synchronized void getCurrentValue(V value) throws IOException
> {
>          in.getCurrentValue(value);
>         //in.next(value);
>     }
>
>     /**
>      * Return the progress within the input split
>      *
>      * @return 0.0 to 1.0 of the input byte range
>      */
>     public float getProgress() throws IOException {
>         if (end == start) {
>             return 0.0f;
>         } else {
>             return Math.min(1.0f, (in.getPosition() - start) / (float) (end
> - start));
>         }
>     }
>
>     public synchronized long getPos() throws IOException {
>         return in.getPosition();
>     }
>
>     protected synchronized void seek(long pos) throws IOException {
>         in.seek(pos);
>     }
>
>     public synchronized void close() throws IOException {
>         in.close();
>     }
>
> }
>
> seems like:
>
>     protected synchronized void getCurrentValue(V value) throws IOException
> {
>          in.getCurrentValue(value);
>     }
>
> ^ Returns nulls
>
>     protected synchronized void getCurrentValue(V value) throws IOException
> {
>        in.next(value);
>     }
>
> ^ returns every other row.
>
> Do you have any idea what I am doing wrong? Will contrib it hopefully If i
> can get this going correctly.
>
> Thanks,
> Edward
>



-- 
Yours,
Zheng
http://www.linkedin.com/in/zshao

Reply via email to