I have the following code where I'm using RDD 'union' and 'subtractByKey' to 
create a new baseline RDD.  All of my RDDs are a key pair with the 'key' a 
String and the 'value' a String (xml document).
// ******************************************************// Merge the daily 
deletes/updates/adds with the baseline// 
****************************************************** // Concat the Updates, 
Deletes into one PairRDDJavaPairRDD<String,String> updateDeletePairRDD = 
updatePairRDD.union(deletePairRDD); // Remove the update/delete  keys from the 
baselineJavaPairRDD<String,String> workSubtractBaselinePairRDD = 
baselinePairRDD.subtractByKey(updateDeletePairRDD); // Add in the 
AddsJavaPairRDD<String,String> workAddBaselinePairRDD = 
workSubtractBaselinePairRDD.union(addPairRDD);

// Add in the UpdatesJavaPairRDD<String,String> newBaselinePairRDD = 
workAddBaselinePairRDD.union(updatePairRDD);
When I go to 'count' the newBaselinePairRDD 
// Output count for new baseline log.info("Number of new baseline records: " + 
newBaselinePairRDD.count());
I'm getting the following exception (the above log.info is SparkSync.java:785). 
 What I find odd is the reference to spark sql.  So, I'm curious as to whether 
under the covers the RDD union and/or subtractByKey are implemented as spark 
sql. I wouldn't think so but thought I would ask.  I'm also suspicious to the 
reference to the '<' and whether that is because of my xml document in the 
value portion of the key pair.  Any insights would be appreciated.  If there 
are thoughts for how to better approach my problem (even debugging), I would be 
interested in that as well.  The updatePairRDD, deletePairRDD, baselinePairRDD, 
addPairRDD, and updateDeletePairRDD are all 'hashPartitioned'.
It's also a bit difficult to trace things because my application is a 'java' 
application and the stack references a lot of scala and very few references to 
my application other than one (SparkSync.java:785).  My application is using 
Spark SQL for some other tasks so perhaps an RDD (part) is being re-calculated 
and is resulting in this error.  But, based on other logging statements 
throughout my application, I don't believe this is the case.
Thanks.
Darin.
14/11/10 22:35:27 INFO scheduler.DAGScheduler: Failed to run count at 
SparkSync.java:78514/11/10 22:35:27 WARN scheduler.TaskSetManager: Lost task 
0.3 in stage 40.0 (TID 10674, ip-10-149-76-35.ec2.internal): 
com.fasterxml.jackson.core.JsonParseException: Unexpected character ('<' (code 
60)): expected a valid value (number, String, array, object, 'true', 'false' or 
'null') at [Source: java.io.StringReader@e8f759e; line: 1, column: 2]        
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1524)     
   
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:557)
        
com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:475)
        
com.fasterxml.jackson.core.json.ReaderBasedJsonParser._handleOddValue(ReaderBasedJsonParser.java:1415)
        
com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:679)
        
com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3024)
        
com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:2971)
        
com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2091)   
     
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:275)
        
org.apache.spark.sql.json.JsonRDD$$anonfun$parseJson$1$$anonfun$apply$5.apply(JsonRDD.scala:274)
        scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        
scala.collection.Iterator$$anon$11.next(Iterator.scala:328)        
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:62)
        
org.apache.spark.sql.columnar.InMemoryRelation$$anonfun$1$$anon$1.next(InMemoryColumnarTableScan.scala:50)
        
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:236)        
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:163)        
org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:70)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:227)        
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)        
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)        
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)        
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)        
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)        
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)      
  org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)    
    org.apache.spark.scheduler.Task.run(Task.scala:54)        
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)        
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
       
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
       java.lang.Thread.run(Thread.java:745)

Reply via email to