So in looking at this a bit more, I gather the root cause is the fact that the nested fields are represented as rows within rows, is that correct? If I don't know the size of the json array (it varies), using x.getAs[Row](0).getString(0) is not really a valid solution.
Is the solution to apply a lateral view + explode to this? I have attempted to change to a lateral view, but looks like my syntax is off: sqlContext.sql( "SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe") .collect.foreach(println(_)) Which results in: 15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0 Exception in thread "main" java.lang.RuntimeException: [1.68] failure: ``UNION'' expected but identifier view found SELECT path,`timestamp`, name, value, pe.value FROM metric lateral view explode(pathElements) a AS pe ^ at scala.sys.package$.error(package.scala:27) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174) at org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136) at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254) at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890) at scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110) at org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83) at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97) at com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358) at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) Is this the right approach? Is this syntax available in 1.2.1: SELECT v1.name, v2.city, v2.state FROM people LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1 as name, address LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2 as city, state; -Todd On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist <tsind...@gmail.com> wrote: > I am accessing ElasticSearch via the elasticsearch-hadoop and attempting > to expose it via SparkSQL. I am using spark 1.2.1, latest supported by > elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" % > "2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m > encountering an issue when I attempt to query the following json after > creating a temporary table from it. The json looks like this: > > PUT /_template/device > { > "template": "dev*", > "settings": { > "number_of_shards": 1 > }, > "mappings": { > "metric": { > "_timestamp" : { > "enabled" : true, > "stored" : true, > "path" : "timestamp", > "format" : "yyyy-MM-dd'T'HH:mm:ssZZ" > }, > "properties": { > "pathId": { > "type": "string" > }, > "pathElements": { > "properties": { > "node": { > "type": "string" > }, > "value": { > "type": "string" > } > } > }, > "name": { > "type": "string" > }, > "value": { > "type": "double" > }, > "timestamp": { > "type": "date", > "store": true > } > } > } > } > } > > Querying all columns work fine except for the pathElements which is a > json array. If this is added to the select it fails with > ajava.util.NoSuchElementException: > key not found: node. > > *Details*. > > The program is pretty basic, looks like this: > > /** > * A simple sample to read and write to ES using elasticsearch-hadoop. > */ > > package com.opsdatastore.elasticsearch.spark > > import java.io.File > > > // Scala imports > import scala.collection.JavaConversions._ > // Spark imports > import org.apache.spark.{SparkConf, SparkContext} > import org.apache.spark.SparkContext._ > > import org.apache.spark.rdd.RDD > > import org.apache.spark.sql.{SchemaRDD,SQLContext} > > // ES imports > import org.elasticsearch.spark._ > import org.elasticsearch.spark.sql._ > > // OpsDataStore > import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch} > > object ElasticSearchReadWrite { > > /** > * Spark specific configuration > */ > def sparkInit(): SparkContext = { > val conf = new > SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master) > conf.set("es.nodes", ElasticSearch.Nodes) > conf.set("es.port", ElasticSearch.HttpPort.toString()) > conf.set("es.index.auto.create", "true"); > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer"); > conf.set("spark.executor.memory","1g") > conf.set("spark.kryoserializer.buffer.mb","256") > > val sparkContext = new SparkContext(conf) > sparkContext.addJar(Spark.JarPath + jar)) > sparkContext > } > > > def main(args: Array[String]) { > > val sc = sparkInit > > val sqlContext = new SQLContext(sc) > import sqlContext._ > > val start = System.currentTimeMillis() > > // specific query, just read all for now > sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*") > > /* > * Read from ES and provide some insight with Spark & SparkSQL > */ > val esData = sc.esRDD("device/metric") > > esData.collect.foreach(println(_)) > > val end = System.currentTimeMillis() > println(s"Total time: ${end-start} ms") > > println("Create Metric Temporary Table for querying") > val schemaRDD = sqlContext.sql( > "CREATE TEMPORARY TABLE metric " + > "USING org.elasticsearch.spark.sql " + > "OPTIONS (resource 'device/metric') " ) > > System.out.println("########################################") > System.out.println("# Scheam Definition #") > System.out.println("########################################") > schemaRDD.printSchema() > > System.out.println("########################################") > System.out.println("# Data from SparkSQL #") > System.out.println("########################################") > > sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM > metric").collect.foreach(println(_)) > } > } > > So this works fine: > > sc.esRDD(*"*device/metric") > esData.collect.foreach(println(_)) > > And results in this: > > 15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at > ElasticSearchReadWrite.scala:67, took 4.948556 s > (AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State, value -> > PA), Map(node -> City, value -> Pittsburgh), Map(node -> Street, value -> > 12345 Westbrook Drive), Map(node -> level, value -> main), Map(node -> > device, value -> thermostat)), value -> 29.590943279257175, name -> Current > Temperature, timestamp -> 2015-03-27T14:53:46+0000, path -> > /PA/Pittsburgh/12345 Westbrook Drive/main/theromostat-1)) > > Yet this fails: > > sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM > metric").collect.foreach(println(_)) > > With this exception: > > Create Metric Temporary Table for > querying######################################### Scheam Definition > ######################################### > root > ######################################### Data from SparkSQL > #########################################15/03/31 14:37:49 INFO > BlockManager: Removing broadcast 015/03/31 14:37:49 INFO BlockManager: > Removing block broadcast_015/03/31 14:37:49 INFO MemoryStore: Block > broadcast_0 of size 1264 dropped from memory (free 278018576)15/03/31 > 14:37:49 INFO BlockManager: Removing block broadcast_0_piece015/03/31 > 14:37:49 INFO MemoryStore: Block broadcast_0_piece0 of size 864 dropped from > memory (free 278019440)15/03/31 14:37:49 INFO BlockManagerInfo: Removed > broadcast_0_piece0 on 192.168.1.5:57820 in memory (size: 864.0 B, free: 265.1 > MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece015/03/31 14:37:49 INFO BlockManagerInfo: Removed > broadcast_0_piece0 on 192.168.1.5:57834 in memory (size: 864.0 B, free: 530.0 > MB)15/03/31 14:37:49 INFO ContextCleaner: Cleaned broadcast 015/03/31 > 14:37:49 INFO ScalaEsRowRDD: Reading from [device/metric]15/03/31 14:37:49 > INFO ScalaEsRowRDD: Discovered mapping > {device=[mappings=[metric=[name=STRING, path=STRING, > pathElements=[node=STRING, value=STRING], pathId=STRING, timestamp=DATE, > value=DOUBLE]]]} for [device/metric]15/03/31 14:37:49 INFO SparkContext: > Starting job: collect at SparkPlan.scala:8415/03/31 14:37:49 INFO > DAGScheduler: Got job 1 (collect at SparkPlan.scala:84) with 1 output > partitions (allowLocal=false)15/03/31 14:37:49 INFO DAGScheduler: Final > stage: Stage 1(collect at SparkPlan.scala:84)15/03/31 14:37:49 INFO > DAGScheduler: Parents of final stage: List()15/03/31 14:37:49 INFO > DAGScheduler: Missing parents: List()15/03/31 14:37:49 INFO DAGScheduler: > Submitting Stage 1 (MappedRDD[6] at map at SparkPlan.scala:84), which has no > missing parents15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(4120) > called with curMem=0, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: > Block broadcast_1 stored as values in memory (estimated size 4.0 KB, free > 265.1 MB)15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(2403) called > with curMem=4120, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: Block > broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free > 265.1 MB)15/03/31 14:37:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in > memory on 192.168.1.5:57820 (size: 2.3 KB, free: 265.1 MB)15/03/31 14:37:49 > INFO BlockManagerMaster: Updated info of block broadcast_1_piece015/03/31 > 14:37:49 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:83815/03/31 14:37:49 INFO DAGScheduler: Submitting 1 > missing tasks from Stage 1 (MappedRDD[6] at map at > SparkPlan.scala:84)15/03/31 14:37:49 INFO TaskSchedulerImpl: Adding task set > 1.0 with 1 tasks15/03/31 14:37:49 INFO TaskSetManager: Starting task 0.0 in > stage 1.0 (TID 1, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:50 INFO > BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:57836 > (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 WARN TaskSetManager: Lost > task 0.0 in stage 1.0 (TID 1, 192.168.1.5): java.util.NoSuchElementException: > key not found: node > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32) > at > org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9) > at > org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16) > at > org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) > at > org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522) > at > org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) > at > org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339) > at > org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165) > at > org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403) > at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) > at > org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, > 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO BlockManagerInfo: > Added broadcast_1_piece0 in memory on 192.168.1.5:57834 (size: 2.3 KB, free: > 530.0 MB)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.1 in stage 1.0 > (TID 2) on executor 192.168.1.5: java.util.NoSuchElementException (key not > found: node) [duplicate 1]15/03/31 14:37:52 INFO TaskSetManager: Starting > task 0.2 in stage 1.0 (TID 3, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 > 14:37:52 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 3) on executor > 192.168.1.5: java.util.NoSuchElementException (key not found: node) > [duplicate 2]15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.3 in > stage 1.0 (TID 4, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO > TaskSetManager: Lost task 0.3 in stage 1.0 (TID 4) on executor 192.168.1.5: > java.util.NoSuchElementException (key not found: node) [duplicate 3]15/03/31 > 14:37:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting > job15/03/31 14:37:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks > have all completed, from pool15/03/31 14:37:52 INFO TaskSchedulerImpl: > Cancelling stage 115/03/31 14:37:52 INFO DAGScheduler: Job 1 failed: collect > at SparkPlan.scala:84, took 3.028325 s > Exception in thread "main" org.apache.spark.SparkException: Job aborted due > to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: > Lost task 0.3 in stage 1.0 (TID 4, 192.168.1.5): > java.util.NoSuchElementException: key not found: node > at scala.collection.MapLike$class.default(MapLike.scala:228) > at scala.collection.AbstractMap.default(Map.scala:58) > at scala.collection.MapLike$class.apply(MapLike.scala:141) > at scala.collection.AbstractMap.apply(Map.scala:58) > at > org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32) > at > org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9) > at > org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16) > at > org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) > at > org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522) > at > org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519) > at > org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339) > at > org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186) > at > org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165) > at > org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403) > at > org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76) > at > org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at scala.collection.Iterator$class.foreach(Iterator.scala:727) > at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) > at > scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103) > at > scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47) > at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273) > at scala.collection.AbstractIterator.to(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265) > at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157) > at > scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252) > at scala.collection.AbstractIterator.toArray(Iterator.scala:1157) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at > org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61) > at org.apache.spark.scheduler.Task.run(Task.scala:56) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > Driver stacktrace: > at > org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) > at scala.Option.foreach(Option.scala:236) > at > org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Any insights into where I am off? I'm sure it is probably something > small, just not seeing it yet. > > TIA for the assistance. > > -Todd > > > > > > >