Hi TD: Just wondering if you have any insight for me or need more info. Thanks
On Thursday, February 1, 2018 7:43 AM, M Singh <mans2si...@yahoo.com.INVALID> wrote: Hi TD: Here is the udpated code with explain and full stack trace. Please let me know what could be the issue and what to look for in the explain output. Updated code: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select("*") dataframe2 = dataframe2.withColumn("cts", current_timestamp().cast("long")) dataframe2.explain(true) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination() }} Explain output: == Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- AnalysisBarrier +- Project [id#0, visit#1] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, visit#1] +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false), StructField(visit,StringType,false))),List(),None,Map(sep -> , path -> ./data/),None), FileSource[./data/], [id#0, visit#1] == Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- StreamingRelation FileSource[./data/], [id#0, visit#1] Here is the exception: 18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to dataType on unresolved object, tree: 'cts=== Streaming Query ===Identifier: [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = b5c618cb-30c7-4eff-8f09-ea1d064878ae]Current Committed Offsets: {}Current Available Offsets: {FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data]: {"logOffset":0}} Current State: ACTIVEThread State: RUNNABLE Logical Plan:Project [id#0, visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, visit#1] +- StreamingExecutionRelation FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data], [id#0, visit#1] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more On Wednesday, January 31, 2018 3:46 PM, Tathagata Das <tathagata.das1...@gmail.com> wrote: Could you give the full stack trace of the exception? Also, can you do `dataframe2.explain(true)` and show us the plan output? On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid> wrote: Hi Folks: I have to add a column to a structured streaming dataframe but when I do that (using select or withColumn) I get an exception. I can add a column in structured non-streaming structured dataframe. I could not find any documentation on how to do this in the following doc [https://spark.apache.org/ docs/latest/structured- streaming-programming-guide. html] I am using spark 2.4.0-SNAPSHOT Please let me know what I could be missing. Thanks for your help. (I am also attaching the source code for the structured streaming, structured non-streaming classes and input file with this email) <exception>org.apache.spark.sql.catalyst. analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst. analysis.UnresolvedAttribute. dataType(unresolved.scala:105) at org.apache.spark.sql.types. StructType$$anonfun$ fromAttributes$1.apply(StructT ype.scala:435)</exception> Here is the input file (in the ./data directory) - note tokens are separated by '\t' 1 v12 v12 v23 v33 v1 Here is the code with dataframe (non-streaming) which works: import scala.collection.immutableimport org.apache.spark.sql. functions._import org.apache.spark.sql._import org.apache.spark.sql.types._ object StructuredTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. appName("StreamingTest").maste r("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframe = spark.read.option("sep","\t"). schema(schema).csv("./data/") var dataframe2 = dataframe.select(expr("*"), current_timestamp().as("cts")) dataframe2.show(false) spark.stop() }} Output of the above code is: +---+-----+------------------- ----+|id |visit|cts |+---+-----+------------------- ----+|1 |v1 |2018-01-31 15:07:00.758||2 |v1 |2018-01-31 15:07:00.758||2 |v2 |2018-01-31 15:07:00.758||3 |v3 |2018-01-31 15:07:00.758||3 |v1 |2018-01-31 15:07:00.758|+---+-----+------------------- ----+ Here is the code with structured streaming which throws the exception: import scala.collection.immutableimport org.apache.spark.sql. functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql. streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming. checkpointLocation", "./checkpointes"). appName("StreamingTest"). master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep", "\t").schema(schema).csv("./ data/") var dataframe2 = dataframeInput.select("*") dataframe2 = dataframe2.withColumn("cts", current_timestamp()) val query = dataframe2.writeStream.option( "trucate","false").format(" console").start query.awaitTermination() }} Here is the exception: 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 0fe655de-9096-4d69-b6a5- c593400d2eba, runId = 2394a402-dd52-49b4-854e- cb46684bf4d8] terminated with errororg.apache.spark.sql.catalyst. analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst. analysis.UnresolvedAttribute. dataType(unresolved.scala:105) at org.apache.spark.sql.types. StructType$$anonfun$ fromAttributes$1.apply(StructT ype.scala:435) at org.apache.spark.sql.types. StructType$$anonfun$ fromAttributes$1.apply(StructT ype.scala:435) I've also used snippets (shown in bold below) from (https://docs.databricks.com/ spark/latest/structured- streaming/examples.html)but still get the same exception: Here is the code: import scala.collection.immutableimport org.apache.spark.sql. functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql. streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming. checkpointLocation", "./checkpointes"). appName("StreamingTest"). master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep", "\t").schema(schema).csv("./ data/") var dataframe2 = dataframeInput.select( current_timestamp().cast(" timestamp").alias("timestamp") , expr("*")) val query = dataframe2.writeStream.option( "trucate","false").format(" console").start query.awaitTermination() }} And the exception: 18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id = 26b2afd9-797e-49ce-b026- 0bd5321536e1, runId = d8ac5386-9d59-4897-b05b- 2750b29c05ca] terminated with errororg.apache.spark.sql.catalyst. analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp ------------------------------ ------------------------------ --------- To unsubscribe e-mail: user-unsubscribe@spark.apache. org