Hi Sean,

The fix was to provide explicitly typed versions of the the three generic 
methods at the bottom of the code:

i.e

static <T> List<T> createListAndCombine(T v) {
    List<T> c = new ArrayList<>();
    c.add(v);
    return c;
}


becomes


static List<Nut> createListAndCombine(Nut v) {
    List<Nut> c = new ArrayList<>();
    c.add(v);
    return c;
}


static List<Bolt> createListAndCombine(Bolt v) {
    List<Bolt> c = new ArrayList<>();
    c.add(v);
    return c;
}


etc.

I forgot to mention that this issue appears in both Java 8/2.4.5 and 
Java11/3.0.0rc2.

Thanks for the SQL tip. I will investigate further.

Cheers,

Steve C

On 22 May 2020, at 11:22 pm, Sean Owen 
<sro...@gmail.com<mailto:sro...@gmail.com>> wrote:

I don't immediately see what the issue could be - try .count()-ing the
individual RDDs to narrow it down?
What code change made it work?

Also I think this could probably be a few lines of SQL with an
aggregate, collect_list(), and joins.

On Thu, May 21, 2020 at 11:27 PM Stephen Coy
<s...@infomedia.com.au.invalid<mailto:s...@infomedia.com.au.invalid>> wrote:

Hi there,

This will be a little long so please bear with me. There is a buildable example 
available at 
https://aus01.safelinks.protection.outlook.com/?url=https%3A%2F%2Fgithub.com%2Fsfcoy%2Fsfcoy-spark-cce-test&amp;data=02%7C01%7Cscoy%40infomedia.com.au%7C8988d3cfc34640f48d9308d7fe532952%7C45d5407150f849caa59f9457123dc71c%7C0%7C0%7C637257505444324095&amp;sdata=B8wrcAhCJ0tAQutTD22KNha48gaO%2FSSepdFBZNmKy58%3D&amp;reserved=0.

Say I have the following three tables:

Machines

Id,MachineType
100001,A
100002,B
200003,B
200004,A
200005,B

Bolts

MachineType,Description
A,20 x M5
A,30 x M5
B,"2"" x 1/4"""
B,"2"" x 1/2"""
B,"2"" x 3/8"""
A,40 x M6
A,50 x M10
B,"1"" x 1/8"""

Nuts

MachineType,Description
A,M5
A,M6
B,"1/4"""
B,"1/8"""
B,"3/8"""


The objective is to create lists of Machines by Id, with all of their bolts and 
nuts listed on the same line:

100001, 20 x M5, 30 x M5, 40 x M6,50 x M10,M5,M6

The output is further categorised by the first 5 digits of the machine id, 
although that seems immaterial to this problem.
In practice I’m dealing with ~70 million machines with a couple of hundred 
thousand types - therefore Spark!

The code to do this looks like:

final Dataset<Machine> machineRecords = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/machines.csv")
 .as(Encoders.bean(Machine.class))
 .persist();

final int workerCount = sparkContext.defaultParallelism();

final JavaPairRDD<String, List<Nut>> nutsByMachine = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/nuts.csv")
 .as(Encoders.bean(Nut.class))
 .toJavaRDD()
 .mapToPair(nut -> new Tuple2<>(nut.getMachineType(), nut))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, 
SparkCCETest::mergeCombiners)
 .persist(StorageLevel.MEMORY_AND_DISK());

final JavaPairRDD<String, List<Bolt>> boltsByMachine = sparkSession
 .read()
 .format("csv")
 .option("header", "true")
 .option("inferSchema", "true")
 .load("src/main/data/bolts.csv")
 .as(Encoders.bean(Bolt.class))
 .toJavaRDD()
 .mapToPair(bolt -> new Tuple2<>(bolt.getMachineType(), bolt))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .combineByKey(SparkCCETest::createListAndCombine, SparkCCETest::mergeValues, 
SparkCCETest::mergeCombiners)
 .persist(StorageLevel.MEMORY_AND_DISK());

machineRecords
 .toJavaRDD()
 .mapToPair(machine -> new Tuple2<>(machine.getMachineType(), machine))
 .join(nutsByMachine)
 .join(boltsByMachine)
 .map(Tuple2::_2)
 .map(tuples -> new Tuple3<>(tuples._1._1, tuples._1._2, tuples._2))
 .mapToPair(machineWithNutsBolts -> new 
Tuple2<>(exportFileFor(machineWithNutsBolts._1()), machineWithNutsBolts))
 .repartitionAndSortWithinPartitions(new HashPartitioner(workerCount))
 .foreachPartition(machineIterator -> { // <- line 77
     ///...
 });


static String exportFileFor(Machine machine) {
   return machine.getId().substring(0, 5);
}

static <T> List<T> createListAndCombine(T v) {
   List<T> c = new ArrayList<>();
   c.add(v);
   return c;
}

static <T> List<T> mergeValues(List<T> c, T v) {
   c.add(v);
   return c;
}

static <T> List<T> mergeCombiners(List<T> c1, List<T> c2) {
   c1.addAll(c2);
   return c1;
}

Running this yields a ClassCastException:

20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_1 failed due to 
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Putting block rdd_47_2 failed due to 
exception java.lang.ClassCastException: org.example.Bolt cannot be cast to 
org.example.Nut.
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_2 could not be removed as it 
was not found on disk or in memory
20/05/22 14:05:31 WARN BlockManager: Block rdd_47_1 could not be removed as it 
was not found on disk or in memory
20/05/22 14:05:31 ERROR Executor: Exception in task 2.0 in stage 9.0 (TID 13)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
20/05/22 14:05:31 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 12)
java.lang.ClassCastException: org.example.Bolt cannot be cast to org.example.Nut
at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
...
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

…

Driver stacktrace:
at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1979)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1967)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1966)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1966)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:946)
at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:946)
at scala.Option.foreach(Option.scala:407)
at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:946)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2196)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2145)
at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2134)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:748)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2095)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2116)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2135)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:2160)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$1(RDD.scala:994)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:388)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:992)
at org.apache.spark.api.java.JavaRDDLike.foreachPartition(JavaRDDLike.scala:219)
at 
org.apache.spark.api.java.JavaRDDLike.foreachPartition$(JavaRDDLike.scala:218)
at 
org.apache.spark.api.java.AbstractJavaRDDLike.foreachPartition(JavaRDDLike.scala:45)
at org.example.SparkCCETest.main(SparkCCETest.java:77)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at 
org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:928)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1007)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1016)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ClassCastException: org.example.Bolt cannot be cast to 
org.example.Nut
at 
org.apache.spark.api.java.JavaPairRDD$.$anonfun$toScalaFunction$1(JavaPairRDD.scala:1041)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1(ExternalAppendOnlyMap.scala:152)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.$anonfun$insertAll$1$adapted(ExternalAppendOnlyMap.scala:151)
at 
org.apache.spark.util.collection.AppendOnlyMap.changeValue(AppendOnlyMap.scala:144)
at 
org.apache.spark.util.collection.SizeTrackingAppendOnlyMap.changeValue(SizeTrackingAppendOnlyMap.scala:32)
at 
org.apache.spark.util.collection.ExternalAppendOnlyMap.insertAll(ExternalAppendOnlyMap.scala:164)
at org.apache.spark.Aggregator.combineValuesByKey(Aggregator.scala:41)
at 
org.apache.spark.rdd.PairRDDFunctions.$anonfun$combineByKeyWithClassTag$3(PairRDDFunctions.scala:92)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:837)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:837)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.$anonfun$getOrCompute$1(RDD.scala:362)
at 
org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1306)
at 
org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1233)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1297)
at 
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1121)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:360)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:311)
at org.apache.spark.rdd.CoGroupedRDD.$anonfun$compute$2(CoGroupedRDD.scala:140)
at 
scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
at scala.collection.immutable.List.foreach(List.scala:392)
at 
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:136)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:349)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:313)
at 
org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
at org.apache.spark.scheduler.Task.run(Task.scala:127)
at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:455)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:458)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)


Anyway, to cut a long story short, its occurred to me while creating this 
reproducer to replace those generic methods at the bottom of the code with 
explicitly typed versions.

This made the problem go away.

This seems like a work around, but does anyone think this could be a bug?

Thanks,

Steve C

P.S. I’m relatively new to Apache Spark so if anyone thinks I’m going about 
this the wrong way then I would be pleased to hear any better ideas.




This email contains confidential information of and is the copyright of 
Infomedia. It must not be forwarded, amended or disclosed without consent of 
the sender. If you received this message by mistake, please advise the sender 
and delete all copies. Security of transmission on the internet cannot be 
guaranteed, could be infected, intercepted, or corrupted and you should ensure 
you have suitable antivirus protection in place. By sending us your or any 
third party personal details, you consent to (or confirm you have obtained 
consent from such third parties) to Infomedia’s privacy policy. 
http://www.infomedia.com.au/privacy-policy/

Reply via email to