Hi there, This will be a little long so please bear with me. There is a buildable example available at https://github.com/sfcoy/sfcoy-spark-cce-test.
Say I have the following three tables: Machines Id,MachineType 100001,A 100002,B 200003,B 200004,A 200005,B Bolts MachineType,Description A,20 x M5 A,30 x M5 B,"2"" x 1/4""" B,"2"" x 1/2""" B,"2"" x 3/8""" A,40 x M6 A,50 x M10 B,"1"" x 1/8""" Nuts MachineType,Description A,M5 A,M6 B,"1/4""" B,"1/8""" B,"3/8""" The objective is to create lists of Machines by Id, with all of their bolts and nuts listed on the same line: 100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6 The output is further categorised by the first 5 digits of the machine id, although that seems immaterial to this problem. In practice I’m dealing with ~70 million machines with a couple of hundred thousand types - therefore Spark! The code to do this looks like: final Dataset<Machine> machineRecords = sparkSession .read() .format("csv") .option("header", "true") .option("inferSchema", "true") .load("src/main/data/machines.csv") .as(Encoders.bean(Machine.class)) .persist(); final int workerCount = sparkContext.defaultParallelism(); final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession .read() .format("csv") .option("header", "true") .option("inferSchema", "true") .load("src/main/data/nuts.csv") .as(Encoders.bean(Nut.class)) .toJavaRDD() .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut)) .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners) .persist(StorageLevel.MEMORY_AND_DISK()); final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession .read() .format("csv") .option("header", "true") .option("inferSchema", "true") .load("src/main/data/bolts.csv") .as(Encoders.bean(Bolt.class)) .toJavaRDD() .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt)) .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, SparkCCETest::mergeCombiners) .persist(StorageLevel.MEMORY_AND_DISK()); machineRecords .toJavaRDD() .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine)) .join(nutsByMachine) .join(boltsByMachine) .map(Tuple2::_2) .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2)) .mapToPair(machineWithNutsBolts -> new Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts)) .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount)) .foreachPartition(machineIterator -> { // <- line 77 ///... }); static String exportFileFor(Machine machine) { return machine.getId().substring(0, 5); } static <T> List<T> createListAndCombine(T v) { List<T> c = new ArrayList<>(); c.add(v); return c; } static <T> List<T> mergeValues(List<T> c, T v) { c.add(v); return c; } static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) { c1.addAll(c2); return c1; } Running this yields a ClassCastException: 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut. 20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to exception java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut. 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it was not found on disk or in memory 20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it was not found on disk or in memory 20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13) java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) ... at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12) java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) ... at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) … Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992) at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219) at org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218) at org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45) at org.example.SparkCCETest.main(SparkCCETest.java:77) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52) at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928) at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180) at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203) at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90) at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut at org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041) at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152) at org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151) at org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144) at org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32) at org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164) at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41) at org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837) at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360) at org.apache.spark.rdd.RDD.iterator(RDD.scala:311) at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140) at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877) at scala.collection.immutable.List.foreach(List.scala:392) at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876) at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349) at org.apache.spark.rdd.RDD.iterator(RDD.scala:313) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52) at org.apache.spark.scheduler.Task.run(Task.scala:127) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Anyway, to cut a long story short, its occurred to me while creating this reproducer to replace those generic methods at the bottom of the code with explicitly typed versions. This made the problem go away. This seems like a work around, but does anyone think this could be a bug? Thanks, Steve C P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about this the wrong way then I would be pleased to hear any better ideas. This email contains confidential information of and is the copyright of Infomedia. It must not be forwarded, amended or disclosed without consent of the sender. If you received this message by mistake, please advise the sender and delete all copies. Security of transmission on the internet cannot be guaranteed, could be infected, intercepted, or corrupted and you should ensure you have suitable antivirus protection in place. By sending us your or any third party personal details, you consent to (or confirm you have obtained consent from such third parties) to Infomedia’s privacy policy. http://www.infomedia.com.au/privacy-policy/