[
https://issues.apache.org/jira/browse/SPARK-21765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142919#comment-16142919
]
Jacek Laskowski commented on SPARK-21765:
-----------------------------------------
I think {{TextSocketSource}} was missed in the change and does not enable
{{isStreaming}} flag leading to the following error:
{code}
val counts = spark.
readStream.
format("rate").
load.
groupBy(window($"timestamp", "5 seconds") as "group").
agg(count("value") as "value_count")
import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
writeStream.
format("console").
option("truncate", false).
trigger(Trigger.ProcessingTime(1.hour)).
outputMode(OutputMode.Complete).
start
17/08/26 21:16:20 ERROR StreamExecution: Query [id =
980bfeba-5433-49db-9d20-055c965b25f3, runId =
b711948f-4def-4205-ac43-ec1e5b852fdb] terminated with error
java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from
TextSocketSource[host: localhost, port: 9999] did not have isStreaming=true
Project [_1#66 AS value#71]
+- Project [_1#66]
+- LocalRelation [_1#66, _2#67]
at scala.Predef$.assert(Predef.scala:170)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$10.apply(StreamExecution.scala:641)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$10.apply(StreamExecution.scala:636)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
at
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:636)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:636)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:274)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:635)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:313)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:301)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:301)
at
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:274)
at
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:301)
at
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
at
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:297)
at
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:213)
{code}
The change seems simple and {{TextSocketSource.getBatch}} should be as follows:
{code}
val rdd = sqlContext.sparkContext.parallelize(rawList).map(v => InternalRow(v))
val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming =
true)
{code}
> Ensure all leaf nodes that are derived from streaming sources have
> isStreaming=true
> -----------------------------------------------------------------------------------
>
> Key: SPARK-21765
> URL: https://issues.apache.org/jira/browse/SPARK-21765
> Project: Spark
> Issue Type: Improvement
> Components: SQL, Structured Streaming
> Affects Versions: 2.2.0
> Reporter: Jose Torres
> Fix For: 3.0.0
>
>
> LogicalPlan has an isStreaming bit, but it's incompletely implemented. Some
> streaming sources don't set the bit, and the bit can sometimes be lost in
> rewriting. Setting the bit for all plans that are logically streaming will
> help us simplify the logic around checking query plan validity.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]