Hi TD:
Just wondering if you have any insight for me or need more info.
Thanks 

    On Thursday, February 1, 2018 7:43 AM, M Singh 
<mans2si...@yahoo.com.INVALID> wrote:
 

 Hi TD:
Here is the udpated code with explain and full stack trace.
Please let me know what could be the issue and what to look for in the explain 
output.
Updated code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp().cast("long"))    
dataframe2.explain(true)    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Explain output:
== Parsed Logical Plan ==Project [id#0, visit#1, cast(current_timestamp() as 
bigint) AS cts#6L]+- AnalysisBarrier      +- Project [id#0, visit#1]         +- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Analyzed Logical Plan ==id: string, visit: string, cts: bigintProject [id#0, 
visit#1, cast(current_timestamp() as bigint) AS cts#6L]+- Project [id#0, 
visit#1]   +- StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Optimized Logical Plan ==Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation 
DataSource(org.apache.spark.sql.SparkSession@3c74aa0d,csv,List(),Some(StructType(StructField(id,StringType,false),
 StructField(visit,StringType,false))),List(),None,Map(sep ->  , path -> 
./data/),None), FileSource[./data/], [id#0, visit#1]
== Physical Plan ==*(1) Project [id#0, visit#1, 1517499591 AS cts#6L]+- 
StreamingRelation FileSource[./data/], [id#0, visit#1]
Here is the exception:
18/02/01 07:39:52 ERROR MicroBatchExecution: Query [id = 
a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Exception
 in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: 
Invalid call to dataType on unresolved object, tree: 'cts=== Streaming Query 
===Identifier: [id = a0e573f0-e93b-48d9-989c-1aaa73539b58, runId = 
b5c618cb-30c7-4eff-8f09-ea1d064878ae]Current Committed Offsets: {}Current 
Available Offsets: 
{FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data]:
 {"logOffset":0}}
Current State: ACTIVEThread State: RUNNABLE
Logical Plan:Project [id#0, visit#1, cast(current_timestamp() as bigint) AS 
cts#6L]+- Project [id#0, visit#1]   +- StreamingExecutionRelation 
FileStreamSource[file:/Users/mans.s/code/samsung/tv-analytics-pipeline-git/tv-analytics-pipeline/tv-exposure-feed/data],
 [id#0, visit#1]
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)Caused
 by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at 
scala.collection.AbstractTraversable.map(Traversable.scala:104) at 
org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
 at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) 
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:448)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:134)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:122)
 at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
 at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:118)
 at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
 ... 1 more 

    On Wednesday, January 31, 2018 3:46 PM, Tathagata Das 
<tathagata.das1...@gmail.com> wrote:
 

 Could you give the full stack trace of the exception?
Also, can you do `dataframe2.explain(true)` and show us the plan output?



On Wed, Jan 31, 2018 at 3:35 PM, M Singh <mans2si...@yahoo.com.invalid> wrote:

Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/ docs/latest/structured- streaming-programming-guide. 
html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-streaming classes and input file with this email)
<exception>org.apache.spark.sql.catalyst. analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst. analysis.UnresolvedAttribute. 
dataType(unresolved.scala:105) at org.apache.spark.sql.types. 
StructType$$anonfun$ fromAttributes$1.apply(StructT ype.scala:435)</exception>
Here is the input file (in the ./data directory) - note tokens are separated by 
'\t'
1 v12 v12 v23 v33 v1
Here is the code with dataframe (non-streaming) which works:
import scala.collection.immutableimport org.apache.spark.sql. functions._import 
org.apache.spark.sql._import org.apache.spark.sql.types._
object StructuredTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      appName("StreamingTest").maste 
r("local[4]")          val spark = sparkBuilder.getOrCreate()       val schema 
= StructType(        Array(          StructField("id", StringType, false),      
     StructField("visit", StringType, false)          ))    var dataframe = 
spark.read.option("sep","\t"). schema(schema).csv("./data/")    var dataframe2 
= dataframe.select(expr("*"), current_timestamp().as("cts"))    
dataframe2.show(false)    spark.stop()      }}
Output of the above code is:
+---+-----+------------------- ----+|id |visit|cts                    
|+---+-----+------------------- ----+|1  |v1   |2018-01-31 15:07:00.758||2  |v1 
  |2018-01-31 15:07:00.758||2  |v2   |2018-01-31 15:07:00.758||3  |v3   
|2018-01-31 15:07:00.758||3  |v1   |2018-01-31 
15:07:00.758|+---+-----+------------------- ----+

Here is the code with structured streaming which throws the exception:
import scala.collection.immutableimport org.apache.spark.sql. functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql. streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      config("spark.sql.streaming. 
checkpointLocation", "./checkpointes").      appName("StreamingTest"). 
master("local[4]")          val spark = sparkBuilder.getOrCreate()       val 
schema = StructType(        Array(          StructField("id", StringType, 
false),           StructField("visit", StringType, false)          ))    var 
dataframeInput = spark.readStream.option("sep", "\t").schema(schema).csv("./ 
data/")    var dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp())    val query = 
dataframe2.writeStream.option( "trucate","false").format(" console").start    
query.awaitTermination()  }}
Here is the exception:
18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 
0fe655de-9096-4d69-b6a5- c593400d2eba, runId = 2394a402-dd52-49b4-854e- 
cb46684bf4d8] terminated with errororg.apache.spark.sql.catalyst. 
analysis.UnresolvedException: Invalid call to dataType on unresolved object, 
tree: 'cts at org.apache.spark.sql.catalyst. analysis.UnresolvedAttribute. 
dataType(unresolved.scala:105) at org.apache.spark.sql.types. 
StructType$$anonfun$ fromAttributes$1.apply(StructT ype.scala:435) at 
org.apache.spark.sql.types. StructType$$anonfun$ fromAttributes$1.apply(StructT 
ype.scala:435)
I've also used snippets (shown in bold below) from 
(https://docs.databricks.com/ spark/latest/structured- 
streaming/examples.html)but still get the same exception:
Here is the code:
import scala.collection.immutableimport org.apache.spark.sql. functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql. streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      config("spark.sql.streaming. 
checkpointLocation", "./checkpointes").      appName("StreamingTest"). 
master("local[4]")          val spark = sparkBuilder.getOrCreate()       val 
schema = StructType(        Array(          StructField("id", StringType, 
false),           StructField("visit", StringType, false)          ))    var 
dataframeInput = spark.readStream.option("sep", "\t").schema(schema).csv("./ 
data/")    var dataframe2 = dataframeInput.select(      
current_timestamp().cast(" timestamp").alias("timestamp") ,      expr("*"))    
val query = dataframe2.writeStream.option( "trucate","false").format(" 
console").start
    query.awaitTermination()  }}

And the exception:
18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id = 
26b2afd9-797e-49ce-b026- 0bd5321536e1, runId = d8ac5386-9d59-4897-b05b- 
2750b29c05ca] terminated with errororg.apache.spark.sql.catalyst. 
analysis.UnresolvedException: Invalid call to dataType on unresolved object, 
tree: 'timestamp


------------------------------ ------------------------------ ---------
To unsubscribe e-mail: user-unsubscribe@spark.apache. org




   

   

Reply via email to