[
https://issues.apache.org/jira/browse/SPARK-26379?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722524#comment-16722524
]
Kailash Gupta commented on SPARK-26379:
---------------------------------------
Full Exception Trace
{code:java}
18/12/16 22:09:32 ERROR MicroBatchExecution: Query [id =
ca9aaed4-e750-453a-88f9-2807365efcf2, runId =
e346ac1d-3873-4922-a327-d539dfdc44bc] terminated with error
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
dataType on unresolved object, tree: 'timestamp
at
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Exception in thread "main"
org.apache.spark.sql.streaming.StreamingQueryException: Invalid call to
dataType on unresolved object, tree: 'timestamp
=== Streaming Query ===
Identifier: [id = ca9aaed4-e750-453a-88f9-2807365efcf2, runId =
e346ac1d-3873-4922-a327-d539dfdc44bc]
Current Committed Offsets: {}
Current Available Offsets: {FileStreamSource[file:/input]: {"logOffset":0}}
Current State: ACTIVE
Thread State: RUNNABLE
Logical Plan:
Project [word#0, count#1, current_timestamp() AS timestamp#4]
+- StreamingExecutionRelation FileStreamSource[file:/input], [word#0, count#1]
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid
call to dataType on unresolved object, tree: 'timestamp
at
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at
org.apache.spark.sql.types.StructType$$anonfun$fromAttributes$1.apply(StructType.scala:435)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.immutable.List.foreach(List.scala:381)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.immutable.List.map(List.scala:285)
at org.apache.spark.sql.types.StructType$.fromAttributes(StructType.scala:435)
at
org.apache.spark.sql.catalyst.plans.QueryPlan.schema$lzycompute(QueryPlan.scala:157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.schema(QueryPlan.scala:157)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:447)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
... 1 more
18/12/16 22:09:32 INFO SparkContext: Invoking stop() from shutdown hook{code}
> Structured Streaming - Exception on adding column to Dataset
> ------------------------------------------------------------
>
> Key: SPARK-26379
> URL: https://issues.apache.org/jira/browse/SPARK-26379
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 2.3.0
> Reporter: Kailash Gupta
> Priority: Major
>
> While using withColumn to add a column to a structured streaming Dataset, I
> am getting following exception:
> org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to
> dataType on unresolved object, tree: 'timestamp
> Following is sample code
> {code:java}
> final String path = "path_to_input_directory";
> final StructType schema = new StructType(new StructField[] { new
> StructField("word", StringType, false, Metadata.empty()), new
> StructField("count", DataTypes.IntegerType, false, Metadata.empty()) });
> SparkSession sparkSession =
> SparkSession.builder().appName("StructuredStreamingIssue").master("local").getOrCreate();
> Dataset<Row> words = sparkSession.readStream().option("sep",
> ",").schema(schema).csv(path);
> Dataset<Row> wordsWithTimestamp = words.withColumn("timestamp",
> functions.current_timestamp());
> // wordsWithTimestamp.explain(true);
> StreamingQuery query =
> wordsWithTimestamp.writeStream().outputMode("update").option("truncate",
> "false").format("console").trigger(Trigger.ProcessingTime("2
> seconds")).start();
> query.awaitTermination();{code}
> Following are the contents of the file present at _path_
> {code:java}
> a,2
> c,4
> d,2
> r,1
> t,9
> {code}
> This seems working with 2.2.0 release, but not with 2.3.0 and 2.4.0
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]