Hi Folks: I have to add a column to a structured streaming dataframe but when I do that (using select or withColumn) I get an exception. I can add a column in structured non-streaming structured dataframe. I could not find any documentation on how to do this in the following doc [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html] I am using spark 2.4.0-SNAPSHOT Please let me know what I could be missing.
Thanks for your help. (I am also attaching the source code for the structured streaming, structured non-streaming classes and input file with this email) <exception>org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)</exception> Here is the input file (in the ./data directory) - note tokens are separated by '\t' 1 v12 v12 v23 v33 v1 Here is the code with dataframe (non-streaming) which works: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.apache.spark.sql._import org.apache.spark.sql.types._ object StructuredTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframe = spark.read.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframe.select(expr("*"), current_timestamp().as("cts")) dataframe2.show(false) spark.stop() }} Output of the above code is: +---+-----+-----------------------+|id |visit|cts |+---+-----+-----------------------+|1 |v1 |2018-01-31 15:07:00.758||2 |v1 |2018-01-31 15:07:00.758||2 |v2 |2018-01-31 15:07:00.758||3 |v3 |2018-01-31 15:07:00.758||3 |v1 |2018-01-31 15:07:00.758|+---+-----+-----------------------+ Here is the code with structured streaming which throws the exception: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select("*") dataframe2 = dataframe2.withColumn("cts", current_timestamp()) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination() }} Here is the exception: 18/01/31 15:10:25 ERROR MicroBatchExecution: Query [id = 0fe655de-9096-4d69-b6a5-c593400d2eba, runId = 2394a402-dd52-49b4-854e-cb46684bf4d8] terminated with errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'cts at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) I've also used snippets (shown in bold below) from (https://docs.databricks.com/spark/latest/structured-streaming/examples.html)but still get the same exception: Here is the code: import scala.collection.immutableimport org.apache.spark.sql.functions._import org.joda.time._import org.apache.spark.sql._import org.apache.spark.sql.types._import org.apache.spark.sql.streaming._import org.apache.log4j._ object StreamingTest { def main(args:Array[String]) : Unit = { val sparkBuilder = SparkSession .builder. config("spark.sql.streaming.checkpointLocation", "./checkpointes"). appName("StreamingTest").master("local[4]") val spark = sparkBuilder.getOrCreate() val schema = StructType( Array( StructField("id", StringType, false), StructField("visit", StringType, false) )) var dataframeInput = spark.readStream.option("sep","\t").schema(schema).csv("./data/") var dataframe2 = dataframeInput.select( current_timestamp().cast("timestamp").alias("timestamp"), expr("*")) val query = dataframe2.writeStream.option("trucate","false").format("console").start query.awaitTermination() }} And the exception: 18/01/31 15:31:00 ERROR MicroBatchExecution: Query [id = 26b2afd9-797e-49ce-b026-0bd5321536e1, runId = d8ac5386-9d59-4897-b05b-2750b29c05ca] terminated with errororg.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp
StreamingTest.scala
Description: Binary data
StructuredTest.scala
Description: Binary data
1 v1 2 v1 2 v2 3 v3 3 v1
--------------------------------------------------------------------- To unsubscribe e-mail: user-unsubscr...@spark.apache.org