[ 
https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722524#comment-16722524
 ] 

Kailash Gupta commented on SPARK-26379:
---------------------------------------

Full Exception Trace
{code:java}
18/12/16 22:09:32 ERROR MicroBatchExecution: Query [id = 
ca9aaed4-e750-453a-88f9-2807365efcf2, runId = 
e346ac1d-3873-4922-a327-d539dfdc44bc] terminated with error
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'timestamp
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main" 
org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to 
dataType on unresolved object, tree: 'timestamp
=== Streaming Query ===
Identifier: [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId = 
e346ac1d-3873-4922-a327-d539dfdc44bc]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[file:/input]: {"logOffset":0}}

Current State: ACTIVE
Thread State: RUNNABLE

Logical Plan:
Project [word#0, count#1, current_timestamp() AS timestamp#4]
+- StreamingExecutionRelation FileStreamSource[file:/input], [word#0, count#1]

at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid 
call to dataType on unresolved object, tree: 'timestamp
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at 
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
at 
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
18/12/16 22:09:32 INFO SparkContext: Invoking stop() from shutdown hook{code}

> Structured Streaming - Exception on adding column to Dataset
> ------------------------------------------------------------
>
>                 Key: SPARK-26379
>                 URL: https://issues.apache.org/jira/browse/SPARK-26379
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.3.0
>            Reporter: Kailash Gupta
>            Priority: Major
>
> While using withColumn to add a column to a structured streaming Dataset, I 
> am getting following exception: 
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
> dataType on unresolved object, tree: 'timestamp
> Following is sample code
> {code:java}
> final String path = "path_to_input_directory";
> final StructType schema = new StructType(new StructField[] { new 
> StructField("word", StringType, false, Metadata.empty()), new 
> StructField("count", DataTypes.IntegerType, false, Metadata.empty()) });
> SparkSession sparkSession = 
> SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate();
> Dataset<Row> words = sparkSession.readStream().option("sep", 
> ",").schema(schema).csv(path);
> Dataset<Row> wordsWithTimestamp = words.withColumn("timestamp", 
> functions.current_timestamp());
> // wordsWithTimestamp.explain(true);
> StreamingQuery query = 
> wordsWithTimestamp.writeStream().outputMode("update").option("truncate", 
> "false").format("console").trigger(Trigger.ProcessingTime("2 
> seconds")).start();
> query.awaitTermination();{code}
> Following are the contents of the file present at _path_
> {code:java}
> a,2
> c,4
> d,2
> r,1
> t,9
> {code}
> This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to