[ https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722524#comment-16722524 ]
Kailash Gupta edited comment on SPARK-26379 at 12/16/18 4:45 PM: ----------------------------------------------------------------- Full exception stack trace {code:java} 18/12/16 22:09:32 ERROR MicroBatchExecution: Query [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId = e346ac1d-3873-4922-a327-d539dfdc44bc] terminated with error org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to dataType on unresolved object, tree: 'timestamp === Streaming Query === Identifier: [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId = e346ac1d-3873-4922-a327-d539dfdc44bc] Current Committed Offsets: {} Current Available Offsets: {FileStreamSource[file:/input]: {"logOffset":0}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: Project [word#0, count#1, current_timestamp() AS timestamp#4] +- StreamingExecutionRelation FileStreamSource[file:/input], [word#0, count#1] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more 18/12/16 22:09:32 INFO SparkContext: Invoking stop() from shutdown hook{code} was (Author: kailashgupta1012): Full Exception Trace {code:java} 18/12/16 22:09:32 ERROR MicroBatchExecution: Query [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId = e346ac1d-3873-4922-a327-d539dfdc44bc] terminated with error org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to dataType on unresolved object, tree: 'timestamp === Streaming Query === Identifier: [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId = e346ac1d-3873-4922-a327-d539dfdc44bc] Current Committed Offsets: {} Current Available Offsets: {FileStreamSource[file:/input]: {"logOffset":0}} Current State: ACTIVE Thread State: RUNNABLE Logical Plan: Project [word#0, count#1, current_timestamp() AS timestamp#4] +- StreamingExecutionRelation FileStreamSource[file:/input], [word#0, count#1] at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'timestamp at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.List.foreach(List.scala:381) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.immutable.List.map(List.scala:285) at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157) at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) ... 1 more 18/12/16 22:09:32 INFO SparkContext: Invoking stop() from shutdown hook{code} > Structured Streaming - Exception on adding column to Dataset > ------------------------------------------------------------ > > Key: SPARK-26379 > URL: https://issues.apache.org/jira/browse/SPARK-26379 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Reporter: Kailash Gupta > Priority: Major > > While using withColumn to add a column to a structured streaming Dataset, I > am getting following exception: > org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to > dataType on unresolved object, tree: 'timestamp > Following is sample code > {code:java} > final String path = "path_to_input_directory"; > final StructType schema = new StructType(new StructField[] { new > StructField("word", StringType, false, Metadata.empty()), new > StructField("count", DataTypes.IntegerType, false, Metadata.empty()) }); > SparkSession sparkSession = > SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate(); > Dataset<Row> words = sparkSession.readStream().option("sep", > ",").schema(schema).csv(path); > Dataset<Row> wordsWithTimestamp = words.withColumn("timestamp", > functions.current_timestamp()); > // wordsWithTimestamp.explain(true); > StreamingQuery query = > wordsWithTimestamp.writeStream().outputMode("update").option("truncate", > "false").format("console").trigger(Trigger.ProcessingTime("2 > seconds")).start(); > query.awaitTermination();{code} > Following are the contents of the file present at _path_ > {code:java} > a,2 > c,4 > d,2 > r,1 > t,9 > {code} > This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org