So in looking at this a bit more, I gather the root cause is the fact that
the nested fields are represented as rows within rows, is that correct?  If
I don't know the size of the json array (it varies), using
x.getAs[Row](0).getString(0) is not really a valid solution.

Is the solution to apply a lateral view + explode to this?

I have attempted to change to a lateral view, but looks like my syntax is
off:

    sqlContext.sql(
        "SELECT path,`timestamp`, name, value, pe.value FROM metric
             lateral view explode(pathElements) a AS pe")
        .collect.foreach(println(_))
Which results in:

15/03/31 17:38:34 INFO ContextCleaner: Cleaned broadcast 0
Exception in thread "main" java.lang.RuntimeException: [1.68] failure:
``UNION'' expected but identifier view found

SELECT path,`timestamp`, name, value, pe.value FROM metric lateral
view explode(pathElements) a AS pe
                                                                   ^
    at scala.sys.package$.error(package.scala:27)
    at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:33)
    at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
        at org.apache.spark.sql.SQLContext$$anonfun$1.apply(SQLContext.scala:79)
    at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:174)
    at 
org.apache.spark.sql.catalyst.SparkSQLParser$$anonfun$org$apache$spark$sql$catalyst$SparkSQLParser$$others$1.apply(SparkSQLParser.scala:173)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:136)
    at scala.util.parsing.combinator.Parsers$Success.map(Parsers.scala:135)
    at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$map$1.apply(Parsers.scala:242)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1$$anonfun$apply$2.apply(Parsers.scala:254)
        at 
scala.util.parsing.combinator.Parsers$Failure.append(Parsers.scala:202)
        at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at 
scala.util.parsing.combinator.Parsers$Parser$$anonfun$append$1.apply(Parsers.scala:254)
    at scala.util.parsing.combinator.Parsers$$anon$3.apply(Parsers.scala:222)
        at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at 
scala.util.parsing.combinator.Parsers$$anon$2$$anonfun$apply$14.apply(Parsers.scala:891)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
    at scala.util.parsing.combinator.Parsers$$anon$2.apply(Parsers.scala:890)
        at 
scala.util.parsing.combinator.PackratParsers$$anon$1.apply(PackratParsers.scala:110)
    at 
org.apache.spark.sql.catalyst.AbstractSparkSQLParser.apply(SparkSQLParser.scala:31)
    at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
        at 
org.apache.spark.sql.SQLContext$$anonfun$parseSql$1.apply(SQLContext.scala:83)
    at scala.Option.getOrElse(Option.scala:120)
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:83)
    at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:303)
    at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite$.main(ElasticSearchReadWrite.scala:97)
    at 
com.opsdatastore.elasticsearch.spark.ElasticSearchReadWrite.main(ElasticSearchReadWrite.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
    at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:358)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

Is this the right approach?  Is this syntax available in 1.2.1:

SELECT
  v1.name, v2.city, v2.state
FROM people
  LATERAL VIEW json_tuple(people.jsonObject, 'name', 'address') v1
     as name, address
  LATERAL VIEW json_tuple(v1.address, 'city', 'state') v2
     as city, state;


-Todd

On Tue, Mar 31, 2015 at 3:26 PM, Todd Nist <tsind...@gmail.com> wrote:

> I am accessing ElasticSearch via the elasticsearch-hadoop and attempting
> to expose it via SparkSQL. I am using spark 1.2.1, latest supported by
> elasticsearch-hadoop, and "org.elasticsearch" % "elasticsearch-hadoop" %
> "2.1.0.BUILD-SNAPSHOT" of elasticsearch-hadoop. I’m
> encountering an issue when I attempt to query the following json after
> creating a temporary table from it. The json looks like this:
>
> PUT /_template/device
> {
>   "template": "dev*",
>   "settings": {
>     "number_of_shards": 1
>   },
>   "mappings": {
>     "metric": {
>       "_timestamp" : {
>         "enabled" : true,
>         "stored" : true,
>         "path" : "timestamp",
>         "format" : "yyyy-MM-dd'T'HH:mm:ssZZ"
>       },
>       "properties": {
>         "pathId": {
>           "type": "string"
>         },
>         "pathElements": {
>           "properties": {
>             "node": {
>               "type": "string"
>             },
>             "value": {
>               "type": "string"
>             }
>           }
>         },
>         "name": {
>           "type": "string"
>         },
>         "value": {
>           "type": "double"
>         },
>         "timestamp": {
>           "type": "date",
>           "store": true
>         }
>       }
>     }
>   }
> }
>
> Querying all columns work fine except for the pathElements which is a
> json array. If this is added to the select it fails with 
> ajava.util.NoSuchElementException:
> key not found: node.
>
> *Details*.
>
> The program is pretty basic, looks like this:
>
> /**
>  * A simple sample to read and write to ES using elasticsearch-hadoop.
>  */
>
> package com.opsdatastore.elasticsearch.spark
>
> import java.io.File
>
>
> // Scala imports
> import scala.collection.JavaConversions._
> // Spark imports
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.SparkContext._
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.{SchemaRDD,SQLContext}
>
> // ES imports
> import org.elasticsearch.spark._
> import org.elasticsearch.spark.sql._
>
> // OpsDataStore
> import com.opsdatastore.spark.utils.{Settings, Spark, ElasticSearch}
>
> object ElasticSearchReadWrite {
>
>   /**
>    * Spark specific configuration
>    */
>   def sparkInit(): SparkContext = {
>     val conf = new 
> SparkConf().setAppName(Spark.AppName).setMaster(Spark.Master)
>     conf.set("es.nodes", ElasticSearch.Nodes)
>     conf.set("es.port", ElasticSearch.HttpPort.toString())
>     conf.set("es.index.auto.create", "true");
>     conf.set("spark.serializer", 
> "org.apache.spark.serializer.KryoSerializer");
>     conf.set("spark.executor.memory","1g")
>     conf.set("spark.kryoserializer.buffer.mb","256")
>
>     val sparkContext = new SparkContext(conf)
> sparkContext.addJar(Spark.JarPath + jar))
>     sparkContext
>   }
>
>
>   def main(args: Array[String]) {
>
>     val sc = sparkInit
>
>     val sqlContext = new SQLContext(sc)
>     import sqlContext._
>
>     val start = System.currentTimeMillis()
>
>     // specific query, just read all for now
> sc.esRDD(s"${ElasticSearch.Index}/${ElasticSearch.Type}", "?q=*:*")
>
>     /*
>      * Read from ES and provide some insight with Spark & SparkSQL
>      */
>     val esData = sc.esRDD("device/metric")
>
>     esData.collect.foreach(println(_))
>
>     val end = System.currentTimeMillis()
>     println(s"Total time: ${end-start} ms")
>
>     println("Create Metric Temporary Table for querying")
>     val schemaRDD = sqlContext.sql(
>           "CREATE TEMPORARY TABLE metric     " +
>           "USING org.elasticsearch.spark.sql " +
>           "OPTIONS (resource 'device/metric')  " )
>
>     System.out.println("########################################")
>     System.out.println("#      Scheam Definition               #")
>     System.out.println("########################################")
>         schemaRDD.printSchema()
>
>     System.out.println("########################################")
>     System.out.println("#      Data from SparkSQL              #")
>     System.out.println("########################################")
>
>     sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM 
> metric").collect.foreach(println(_))
>   }
> }
>
> So this works fine:
>
> sc.esRDD(*"*device/metric")
> esData.collect.foreach(println(_))
>
> And results in this:
>
> 15/03/31 14:37:48 INFO DAGScheduler: Job 0 finished: collect at 
> ElasticSearchReadWrite.scala:67, took 4.948556 s
> (AUxxDrs4cgadF5SlaMg0,Map(pathElements -> Buffer(Map(node -> State, value -> 
> PA), Map(node -> City, value -> Pittsburgh), Map(node -> Street, value -> 
> 12345 Westbrook Drive), Map(node -> level, value -> main), Map(node -> 
> device, value -> thermostat)), value -> 29.590943279257175, name -> Current 
> Temperature, timestamp -> 2015-03-27T14:53:46+0000, path -> 
> /PA/Pittsburgh/12345 Westbrook Drive/main/theromostat-1))
>
> Yet this fails:
>
> sqlContext.sql("SELECT path, pathElements, `timestamp`, name, value FROM 
> metric").collect.foreach(println(_))
>
> With this exception:
>
> Create Metric Temporary Table for 
> querying#########################################      Scheam Definition      
>          #########################################
> root
> #########################################      Data from SparkSQL             
>  #########################################15/03/31 14:37:49 INFO 
> BlockManager: Removing broadcast 015/03/31 14:37:49 INFO BlockManager: 
> Removing block broadcast_015/03/31 14:37:49 INFO MemoryStore: Block 
> broadcast_0 of size 1264 dropped from memory (free 278018576)15/03/31 
> 14:37:49 INFO BlockManager: Removing block broadcast_0_piece015/03/31 
> 14:37:49 INFO MemoryStore: Block broadcast_0_piece0 of size 864 dropped from 
> memory (free 278019440)15/03/31 14:37:49 INFO BlockManagerInfo: Removed 
> broadcast_0_piece0 on 192.168.1.5:57820 in memory (size: 864.0 B, free: 265.1 
> MB)15/03/31 14:37:49 INFO BlockManagerMaster: Updated info of block 
> broadcast_0_piece015/03/31 14:37:49 INFO BlockManagerInfo: Removed 
> broadcast_0_piece0 on 192.168.1.5:57834 in memory (size: 864.0 B, free: 530.0 
> MB)15/03/31 14:37:49 INFO ContextCleaner: Cleaned broadcast 015/03/31 
> 14:37:49 INFO ScalaEsRowRDD: Reading from [device/metric]15/03/31 14:37:49 
> INFO ScalaEsRowRDD: Discovered mapping 
> {device=[mappings=[metric=[name=STRING, path=STRING, 
> pathElements=[node=STRING, value=STRING], pathId=STRING, timestamp=DATE, 
> value=DOUBLE]]]} for [device/metric]15/03/31 14:37:49 INFO SparkContext: 
> Starting job: collect at SparkPlan.scala:8415/03/31 14:37:49 INFO 
> DAGScheduler: Got job 1 (collect at SparkPlan.scala:84) with 1 output 
> partitions (allowLocal=false)15/03/31 14:37:49 INFO DAGScheduler: Final 
> stage: Stage 1(collect at SparkPlan.scala:84)15/03/31 14:37:49 INFO 
> DAGScheduler: Parents of final stage: List()15/03/31 14:37:49 INFO 
> DAGScheduler: Missing parents: List()15/03/31 14:37:49 INFO DAGScheduler: 
> Submitting Stage 1 (MappedRDD[6] at map at SparkPlan.scala:84), which has no 
> missing parents15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(4120) 
> called with curMem=0, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: 
> Block broadcast_1 stored as values in memory (estimated size 4.0 KB, free 
> 265.1 MB)15/03/31 14:37:49 INFO MemoryStore: ensureFreeSpace(2403) called 
> with curMem=4120, maxMem=27801944015/03/31 14:37:49 INFO MemoryStore: Block 
> broadcast_1_piece0 stored as bytes in memory (estimated size 2.3 KB, free 
> 265.1 MB)15/03/31 14:37:49 INFO BlockManagerInfo: Added broadcast_1_piece0 in 
> memory on 192.168.1.5:57820 (size: 2.3 KB, free: 265.1 MB)15/03/31 14:37:49 
> INFO BlockManagerMaster: Updated info of block broadcast_1_piece015/03/31 
> 14:37:49 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:83815/03/31 14:37:49 INFO DAGScheduler: Submitting 1 
> missing tasks from Stage 1 (MappedRDD[6] at map at 
> SparkPlan.scala:84)15/03/31 14:37:49 INFO TaskSchedulerImpl: Adding task set 
> 1.0 with 1 tasks15/03/31 14:37:49 INFO TaskSetManager: Starting task 0.0 in 
> stage 1.0 (TID 1, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:50 INFO 
> BlockManagerInfo: Added broadcast_1_piece0 in memory on 192.168.1.5:57836 
> (size: 2.3 KB, free: 530.0 MB)15/03/31 14:37:52 WARN TaskSetManager: Lost 
> task 0.0 in stage 1.0 (TID 1, 192.168.1.5): java.util.NoSuchElementException: 
> key not found: node
>     at scala.collection.MapLike$class.default(MapLike.scala:228)
>     at scala.collection.AbstractMap.default(Map.scala:58)
>     at scala.collection.MapLike$class.apply(MapLike.scala:141)
>     at scala.collection.AbstractMap.apply(Map.scala:58)
>     at 
> org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32)
>     at 
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9)
>     at 
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
>     at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
>     at 
> org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
>     at org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
>     at 
> org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
>     at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>     at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>     at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>     at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>     at org.apache.spark.scheduler.Task.run(Task.scala:56)
>     at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>     at java.lang.Thread.run(Thread.java:745)
> 15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.1 in stage 1.0 (TID 2, 
> 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO BlockManagerInfo: 
> Added broadcast_1_piece0 in memory on 192.168.1.5:57834 (size: 2.3 KB, free: 
> 530.0 MB)15/03/31 14:37:52 INFO TaskSetManager: Lost task 0.1 in stage 1.0 
> (TID 2) on executor 192.168.1.5: java.util.NoSuchElementException (key not 
> found: node) [duplicate 1]15/03/31 14:37:52 INFO TaskSetManager: Starting 
> task 0.2 in stage 1.0 (TID 3, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 
> 14:37:52 INFO TaskSetManager: Lost task 0.2 in stage 1.0 (TID 3) on executor 
> 192.168.1.5: java.util.NoSuchElementException (key not found: node) 
> [duplicate 2]15/03/31 14:37:52 INFO TaskSetManager: Starting task 0.3 in 
> stage 1.0 (TID 4, 192.168.1.5, NODE_LOCAL, 3731 bytes)15/03/31 14:37:52 INFO 
> TaskSetManager: Lost task 0.3 in stage 1.0 (TID 4) on executor 192.168.1.5: 
> java.util.NoSuchElementException (key not found: node) [duplicate 3]15/03/31 
> 14:37:52 ERROR TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting 
> job15/03/31 14:37:52 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks 
> have all completed, from pool15/03/31 14:37:52 INFO TaskSchedulerImpl: 
> Cancelling stage 115/03/31 14:37:52 INFO DAGScheduler: Job 1 failed: collect 
> at SparkPlan.scala:84, took 3.028325 s
> Exception in thread "main" org.apache.spark.SparkException: Job aborted due 
> to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: 
> Lost task 0.3 in stage 1.0 (TID 4, 192.168.1.5): 
> java.util.NoSuchElementException: key not found: node
>       at scala.collection.MapLike$class.default(MapLike.scala:228)
>       at scala.collection.AbstractMap.default(Map.scala:58)
>       at scala.collection.MapLike$class.apply(MapLike.scala:141)
>       at scala.collection.AbstractMap.apply(Map.scala:58)
>       at 
> org.elasticsearch.spark.sql.RowValueReader$class.addToBuffer(RowValueReader.scala:32)
>       at 
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToBuffer(ScalaRowValueReader.scala:9)
>       at 
> org.elasticsearch.spark.sql.ScalaRowValueReader.addToMap(ScalaRowValueReader.scala:16)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.list(ScrollReader.java:560)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:522)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.map(ScrollReader.java:596)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:519)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.readHitAsMap(ScrollReader.java:339)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.readHit(ScrollReader.java:290)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:186)
>       at 
> org.elasticsearch.hadoop.serialization.ScrollReader.read(ScrollReader.java:165)
>       at 
> org.elasticsearch.hadoop.rest.RestRepository.scroll(RestRepository.java:403)
>       at 
> org.elasticsearch.hadoop.rest.ScrollQuery.hasNext(ScrollQuery.java:76)
>       at 
> org.elasticsearch.spark.rdd.AbstractEsRDDIterator.hasNext(AbstractEsRDDIterator.scala:46)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:727)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
>       at 
> scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
>       at 
> scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
>       at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
>       at scala.collection.AbstractIterator.to(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
>       at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
>       at 
> scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
>       at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
>       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>       at org.apache.spark.rdd.RDD$$anonfun$17.apply(RDD.scala:797)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>       at 
> org.apache.spark.SparkContext$$anonfun$runJob$4.apply(SparkContext.scala:1353)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>       at org.apache.spark.scheduler.Task.run(Task.scala:56)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>       at java.lang.Thread.run(Thread.java:745)
>
> Driver stacktrace:
>       at 
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>       at 
> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>       at 
> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>       at scala.Option.foreach(Option.scala:236)
>       at 
> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.spark.scheduler.DAGSchedulerEventProcessActor.aroundReceive(DAGScheduler.scala:1375)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:220)
>       at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> Any insights into where I am off?  I'm sure it is probably something
> small, just not seeing it yet.
>
> TIA for the assistance.
>
> -Todd
>
>
>
>
>
>
>

Reply via email to