I have the following code where I'm using RDD 'union' and 'subtractByKey' to create a new baseline RDD. All of my RDDs are a key pair with the 'key' a String and the 'value' a String (xml document). // ******************************************************// Merge the daily deletes/updates/adds with the baseline// ****************************************************** // Concat the Updates, Deletes into one PairRDDJavaPairRDD<String,String> updateDeletePairRDD = updatePairRDD.union(deletePairRDD); // Remove the update/delete keys from the baselineJavaPairRDD<String,String> workSubtractBaselinePairRDD = baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the AddsJavaPairRDD<String,String> workAddBaselinePairRDD = workSubtractBaselinePairRDD.union(addPairRDD);
// Add in the UpdatesJavaPairRDD<String,String> newBaselinePairRDD = workAddBaselinePairRDD.union(updatePairRDD); When I go to 'count' the newBaselinePairRDD // Output count for new baseline log.info("Number of new baseline records: " + newBaselinePairRDD.count()); I'm getting the following exception (the above log.info is SparkSync.java:785). What I find odd is the reference to spark sql. So, I'm curious as to whether under the covers the RDD union and/or subtractByKey are implemented as spark sql. I wouldn't think so but thought I would ask. I'm also suspicious to the reference to the '<' and whether that is because of my xml document in the value portion of the key pair. Any insights would be appreciated. If there are thoughts for how to better approach my problem (even debugging), I would be interested in that as well. The updatePairRDD, deletePairRDD, baselinePairRDD, addPairRDD, and updateDeletePairRDD are all 'hashPartitioned'. It's also a bit difficult to trace things because my application is a 'java' application and the stack references a lot of scala and very few references to my application other than one (SparkSync.java:785). My application is using Spark SQL for some other tasks so perhaps an RDD (part) is being re-calculated and is resulting in this error. But, based on other logging statements throughout my application, I don't believe this is the case. Thanks. Darin. 14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at SparkSync.java:78514/11/10 22:35:27 WARN scheduler.TaskSetManager: Lost task 0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal): com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 60)): expected a valid value (number, String, array, object, 'true', 'false' or 'null') at [Source: java.io.StringReader@e8f759e; line: 1, column: 2] com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524) com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557) com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475) com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415) com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679) com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024) com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971) com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275) org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) scala.collection.Iterator$$anon$11.next(Iterator.scala:328) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62) org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50) org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236) org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163) org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70) org.apache.spark.rdd.RDD.iterator(RDD.scala:227) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35) org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262) org.apache.spark.rdd.RDD.iterator(RDD.scala:229) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) org.apache.spark.scheduler.Task.run(Task.scala:54) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) java.lang.Thread.run(Thread.java:745)