[
https://issues.apache.org/jira/browse/CRUNCH-301?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
David Whiting updated CRUNCH-301:
---------------------------------
Attachment: IsolatedBug.scala
Attached code to reproduce
> Cogrouping tables where RHS has a Scala tuple value type causes duplicated
> and missing RHS values
> -------------------------------------------------------------------------------------------------
>
> Key: CRUNCH-301
> URL: https://issues.apache.org/jira/browse/CRUNCH-301
> Project: Crunch
> Issue Type: Bug
> Components: Scrunch
> Affects Versions: 0.8.0
> Environment: Hadoop 2
> Reporter: David Whiting
> Attachments: IsolatedBug.scala
>
>
> Suppose you have three record types, Rec1, Rec2 and Rec3.
> Rec1 references Rec2 via key1, and Rec2 references Rec3 (one-to-many) by
> key2. If you innerJoin Rec2 and Rec3 to make a PCollection[(Rec2,Rec3)] and
> they cogroup it against Rec1, then instead of surfacing n different
> (Rec2,Rec3) tuples applicable to the Rec1, it surfaces just one of the (Rec2,
> Rec3) tuples multiple times.
> This only happens when running with MRPipeline, and not with MemPipeline.
> This is the simplest complete program I could come up with which will produce
> this unexpected result:
> {code}
> package testcases
> import org.apache.crunch.impl.mr.MRPipeline
> import org.apache.crunch.io.{From, To}
> import org.apache.crunch.scrunch.PCollection
> import org.apache.crunch.types.avro.{ReflectDataFactory, Avros}
> import org.apache.avro.file.DataFileWriter
> import org.apache.hadoop.fs.{Path, FSDataOutputStream}
> import org.apache.hadoop.conf.Configuration
> object IsolatedBug {
> case class Rec1(var k: Int, var v: String) { def this() = this(0, "") }
> case class Rec2(var k: Int, var k2: String, var v: Double) { def this() =
> this(0, "", 0.0) }
> case class Rec3(var k2: String, var v:Int) { def this() = this("", 0)}
> def run() {
> val prefix = "/user/davw/tmp/isolation"
> val ones = Seq(Rec1(1, "hello"), Rec1(1, "tjena"), Rec1(2, "goodbye"))
> val twos = Seq(Rec2(1, "a", 0.4), Rec2(1, "a", 0.5), Rec2(1, "b", 0.6),
> Rec2(1, "b", 0.7), Rec2(2, "c", 9.9))
> val threes = Seq(Rec3("a", 4), Rec3("b", 5), Rec3("c", 6))
> writeCollection(new Path(prefix + "/ones"), ones)
> writeCollection(new Path(prefix + "/twos"), twos)
> writeCollection(new Path(prefix + "/threes"), threes)
> val pipe = new MRPipeline(getClass)
> val oneF = new PCollection(pipe.read(From.avroFile(prefix + "/ones",
> Avros.reflects(classOf[Rec1]))))
> val twoF = new PCollection(pipe.read(From.avroFile(prefix + "/twos",
> Avros.reflects(classOf[Rec2]))))
> val threeF = new PCollection(pipe.read(From.avroFile(prefix + "/threes",
> Avros.reflects(classOf[Rec3]))))
> (oneF.by(_.k)
> cogroup
> (twoF.by(_.k2)
> innerJoin threeF.by(_.k2))
> .values()
> .by(_._1.k))
> .values()
> .map(
> {case (one, twothree) =>
> (one ++ twothree)
> .map(_.toString)
> .reduce((a,b) => a + "\t" + b)})
> .write(To.textFile(prefix + "/output"))
> pipe.done()
> }
> def writeCollection(path: Path, records: Iterable[_ <: AnyRef]) {
> writeAvroFile(path.getFileSystem(new Configuration()).create(path, true),
> records)
> }
> @SuppressWarnings(Array("rawtypes", "unchecked"))
> private def writeAvroFile[T <: AnyRef](outputStream: FSDataOutputStream,
> records: Iterable[T]) {
> val r: AnyRef = records.iterator.next()
> val schema = new ReflectDataFactory().getReflectData.getSchema(r.getClass)
> val writer = new ReflectDataFactory().getWriter[T](schema)
> val dataFileWriter = new DataFileWriter(writer)
> dataFileWriter.create(schema, outputStream)
> for (record <- records) {
> dataFileWriter.append(record)
> }
> dataFileWriter.close()
> outputStream.close()
> }
> def main(args: Seq[String]) { run() }
> }
> {code}
> The result that is produced is:
> {code}
> Rec1(1,tjena) Rec1(1,hello) (Rec2(1,a,0.5),Rec3(a,4))
> (Rec2(1,a,0.5),Rec3(a,4)) (Rec2(1,a,0.5),Rec3(a,4))
> (Rec2(1,a,0.5),Rec3(a,4))
> Rec1(2,goodbye) (Rec2(2,c,9.9),Rec3(c,6))
> {code}
> As you can see, there's a single (Rec2, Rec3) tuple repeated many times,
> instead of showing all the distinct ones. This does not happen if you join
> against Rec2 on its own.
--
This message was sent by Atlassian JIRA
(v6.1#6144)