[ 
https://issues.apache.org/jira/browse/CRUNCH-301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Whiting updated CRUNCH-301:
---------------------------------

    Attachment: IsolatedBug.scala

Attached code to reproduce

> Cogrouping tables where RHS has a Scala tuple value type causes duplicated 
> and missing RHS values
> -------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-301
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-301
>             Project: Crunch
>          Issue Type: Bug
>          Components: Scrunch
>    Affects Versions: 0.8.0
>         Environment: Hadoop 2
>            Reporter: David Whiting
>         Attachments: IsolatedBug.scala
>
>
> Suppose you have three record types, Rec1, Rec2 and Rec3.
> Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by 
> key2. If you innerJoin Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and 
> they cogroup it against Rec1, then instead of surfacing n different 
> (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of the (Rec2, 
> Rec3) tuples multiple times.
> This only happens when running with MRPipeline, and not with MemPipeline.
> This is the simplest complete program I could come up with which will produce 
> this unexpected result:
> {code}
> package testcases
> import org.apache.crunch.impl.mr.MRPipeline
> import org.apache.crunch.io.{From, To}
> import org.apache.crunch.scrunch.PCollection
> import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
> import org.apache.avro.file.DataFileWriter
> import org.apache.hadoop.fs.{Path, FSDataOutputStream}
> import org.apache.hadoop.conf.Configuration
> object IsolatedBug {
>   case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
>   case class Rec2(var k: Int, var k2: String, var v: Double) { def this() = 
> this(0, "", 0.0) }
>   case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
>   def run() {
>     val prefix = "/user/davw/tmp/isolation"
>     val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
>     val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6), 
> Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
>     val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
>     writeCollection(new Path(prefix + "/ones"), ones)
>     writeCollection(new Path(prefix + "/twos"), twos)
>     writeCollection(new Path(prefix + "/threes"), threes)
>     val pipe = new MRPipeline(getClass)
>     val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones", 
> Avros.reflects(classOf[Rec1]))))
>     val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos", 
> Avros.reflects(classOf[Rec2]))))
>     val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes", 
> Avros.reflects(classOf[Rec3]))))
>     (oneF.by(_.k)
>       cogroup
>       (twoF.by(_.k2)
>          innerJoin threeF.by(_.k2))
>         .values()
>         .by(_._1.k))
>       .values()
>       .map(
>         {case (one, twothree) =>
>           (one ++ twothree)
>             .map(_.toString)
>             .reduce((a,b) => a + "\t" + b)})
>       .write(To.textFile(prefix + "/output"))
>     pipe.done()
>   }
>   def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
>     writeAvroFile(path.getFileSystem(new Configuration()).create(path, true), 
> records)
>   }
>   @SuppressWarnings(Array("rawtypes", "unchecked"))
>   private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream, 
> records: Iterable[T]) {
>     val r: AnyRef = records.iterator.next()
>     val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)
>     val writer = new ReflectDataFactory().getWriter[T](schema)
>     val dataFileWriter = new DataFileWriter(writer)
>     dataFileWriter.create(schema, outputStream)
>     for (record <- records) {
>       dataFileWriter.append(record)
>     }
>     dataFileWriter.close()
>     outputStream.close()
>   }
>   def main(args: Seq[String]) { run() }
> }
> {code}
> The result that is produced is:
> {code}
> Rec1(1,tjena) Rec1(1,hello)   (Rec2(1,a,0.5),Rec3(a,4))       
> (Rec2(1,a,0.5),Rec3(a,4))       (Rec2(1,a,0.5),Rec3(a,4))       
> (Rec2(1,a,0.5),Rec3(a,4))
> Rec1(2,goodbye)       (Rec2(2,c,9.9),Rec3(c,6))
> {code}
> As you can see, there's a single (Rec2, Rec3) tuple repeated many times, 
> instead of showing all the distinct ones. This does not happen if you join 
> against Rec2 on its own.



--
This message was sent by Atlassian JIRA
(v6.1#6144)

Reply via email to