Hi,

I am trying to create multiple iterators in a DoFn process method.

/ public void process(Pair<Integer, Iterable<TupleN>> input,
        Emitter<Pair<String, Integer>> emitter) {}

/Eve/ry /time I ask a iterator it gives back the same one and thus I could not not traverse the list again and again as I am hitting the following stack trace .
/
1575 [Thread-23] WARN org.apache.hadoop.mapred.LocalJobRunner - job_local_0001
java.util.NoSuchElementException: iterate past last value
at org.apache.hadoop.mapreduce.ReduceContext$ValueIterator.next(ReduceContext.java:159) at com.cloudera.crunch.types.PGroupedTableType$PTypeIterable$1.next(PGroupedTableType.java:56) at com.mylearning.crunch.TuplesTest$ScoreCalculator.process(TuplesTest.java:52) at com.mylearning.crunch.TuplesTest$ScoreCalculator.process(TuplesTest.java:1)
at com.cloudera.crunch.impl.mr.run.RTNode.process(RTNode.java:85)
at com.cloudera.crunch.impl.mr.emit.IntermediateEmitter.emit(IntermediateEmitter.java:39)
.............../


What I see that /PGroupedTableType /returns back the same iterator available to it via ReducerContext and this is the same reason that I get to see the exception.
Shouldn't this give a back a new iterator everytime I ask it ?
If not then what should be the way of doing the same ? (I can iterate first adding elements to a collection but then I donno if that should be the way of going forward)
I have attached my test case for your reference.

regards,
Rahul
package com.mylearning.crunch;

import java.util.Iterator;

import org.junit.Test;

import com.cloudera.crunch.DoFn;
import com.cloudera.crunch.Emitter;
import com.cloudera.crunch.PCollection;
import com.cloudera.crunch.PGroupedTable;
import com.cloudera.crunch.PTable;
import com.cloudera.crunch.Pair;
import com.cloudera.crunch.Pipeline;
import com.cloudera.crunch.TupleN;
import com.cloudera.crunch.impl.mr.MRPipeline;
import com.cloudera.crunch.test.FileHelper;
import com.cloudera.crunch.types.writable.TupleWritable;
import com.cloudera.crunch.types.writable.WritableType;
import com.cloudera.crunch.types.writable.Writables;

public class TuplesTest {

  static class LineSpliter extends DoFn<String, Pair<Integer, TupleN>> {

    private static final long serialVersionUID = 1L;

    public void process(String line, Emitter<Pair<Integer, TupleN>> emitter) {
      String[] splitData = line.split(",");
      Object[] objects = new Object[splitData.length];
      for (int pos = 0; pos < objects.length; pos++) {
        objects[pos] = splitData[pos];
      }
      emitter.emit(new Pair<Integer, TupleN>(line.hashCode(), new 
TupleN(objects)));
    }

  }

  static class ScoreCalculator extends DoFn<Pair<Integer, Iterable<TupleN>>, 
Pair<String, Integer>> {

    private static final long serialVersionUID = 1L;

    @Override
    public void process(Pair<Integer, Iterable<TupleN>> input,
        Emitter<Pair<String, Integer>> emitter) {
      Iterator<TupleN> primary = input.second().iterator();
      int pos = 1;
      while (primary.hasNext()) {
        Iterator<TupleN> secondary = getSecondary(input, pos++);
        if (secondary == null) {
          return;
        }
        TupleN first = (TupleN) primary.next();
        while (secondary.hasNext()) {
          TupleN second = (TupleN) secondary.next();
          emitter
              .emit(new Pair<String, Integer>(first.toString() + " : " + 
second.toString(), 100));
        }
      }
    }

    private Iterator<TupleN> getSecondary(Pair<Integer, Iterable<TupleN>> 
input, int pos) {
      Iterator<TupleN> secondary = input.second().iterator();
      for (int i = 0; i < pos; i++) {
        if (!secondary.hasNext()) {
          return null;
        }
        secondary.next();
      }
      return secondary;
    }

  }

  @Test
  public void shouldGiveSomeResults() throws Exception {
    String fileLoc = FileHelper.createTempCopyOf("person-data.txt");
    Pipeline pipeline = new MRPipeline(TuplesTest.class);
    PCollection<String> readLines = pipeline.readTextFile(fileLoc);

    WritableType<TupleN, TupleWritable> tuples = 
Writables.tuples(Writables.strings(),
        Writables.strings(), Writables.strings(), Writables.strings(), 
Writables.strings());

    PTable<Integer, TupleN> classifiedData = readLines.parallelDo("readling 
Lines",
        new LineSpliter(), Writables.tableOf(Writables.ints(), tuples));
    PGroupedTable<Integer, TupleN> groupedData = classifiedData.groupByKey();
    PTable<String, Integer> scores = groupedData.parallelDo("compute scores",
        new ScoreCalculator(), Writables.tableOf(Writables.strings(), 
Writables.ints()));
    pipeline.writeTextFile(scores, "/home/rahul/crunchOut");
    pipeline.done();
  }
}
name,Fname,LName,Country,DoB
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999
Rahul Sharma,Rahul,Sharma,India,1-1-1999

Reply via email to