Hi Folks:
I have to add a column to a structured streaming dataframe but when I do that 
(using select or withColumn) I get an exception.  I can add a column in 
structured non-streaming structured dataframe. I could not find any 
documentation on how to do this in the following doc  
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html]
I am using spark 2.4.0-SNAPSHOT
Please let me know what I could be missing.

Thanks for your help.
(I am also attaching the source code for the structured streaming, structured 
non-streaming classes and input file with this email)
<exception>org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)</exception>
Here is the input file (in the ./data directory) - note tokens are separated by 
'\t'
1 v12 v12 v23 v33 v1
Here is the code with dataframe (non-streaming) which works:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.apache.spark.sql._import org.apache.spark.sql.types._
object StructuredTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframe = 
spark.read.option("sep","\t").schema(schema).csv("./data/")    var dataframe2 = 
dataframe.select(expr("*"), current_timestamp().as("cts"))    
dataframe2.show(false)    spark.stop()      }}
Output of the above code is:
+---+-----+-----------------------+|id |visit|cts                    
|+---+-----+-----------------------+|1  |v1   |2018-01-31 15:07:00.758||2  |v1  
 |2018-01-31 15:07:00.758||2  |v2   |2018-01-31 15:07:00.758||3  |v3   
|2018-01-31 15:07:00.758||3  |v1   |2018-01-31 
15:07:00.758|+---+-----+-----------------------+

Here is the code with structured streaming which throws the exception:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select("*")    dataframe2 = 
dataframe2.withColumn("cts", current_timestamp())    val query = 
dataframe2.writeStream.option("trucate","false").format("console").start    
query.awaitTermination()  }}
Here is the exception:
18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 
0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 
2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'cts at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
 at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
I've also used snippets (shown in bold below) from 
(https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but
 still get the same exception:
Here is the code:
import scala.collection.immutableimport org.apache.spark.sql.functions._import 
org.joda.time._import org.apache.spark.sql._import 
org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import 
org.apache.log4j._
object StreamingTest {  def main(args:Array[String]) : Unit = {    val 
sparkBuilder = SparkSession      .builder.      
config("spark.sql.streaming.checkpointLocation", "./checkpointes").      
appName("StreamingTest").master("local[4]")          val spark = 
sparkBuilder.getOrCreate()       val schema = StructType(        Array(         
 StructField("id", StringType, false),           StructField("visit", 
StringType, false)          ))    var dataframeInput = 
spark.readStream.option("sep","\t").schema(schema).csv("./data/")    var 
dataframe2 = dataframeInput.select(      
current_timestamp().cast("timestamp").alias("timestamp"),      expr("*"))    
val query = 
dataframe2.writeStream.option("trucate","false").format("console").start
    query.awaitTermination()  }}

And the exception:
18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id = 
26b2afd9-797e-49ce-b026-0bd5321536e1, runId = 
d8ac5386-9d59-4897-b05b-2750b29c05ca] terminated with 
errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call 
to dataType on unresolved object, tree: 'timestamp

Attachment: StreamingTest.scala
Description: Binary data

Attachment: StructuredTest.scala
Description: Binary data

1	v1
2	v1
2	v2
3	v3
3	v1
---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscr...@spark.apache.org

Reply via email to