[ 
https://issues.apache.org/jira/browse/SPARK-21765?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16142919#comment-16142919
 ] 

Jacek Laskowski commented on SPARK-21765:
-----------------------------------------

I think {{TextSocketSource}} was missed in the change and does not enable 
{{isStreaming}} flag leading to the following error:

{code}
val counts = spark.
  readStream.
  format("rate").
  load.
  groupBy(window($"timestamp", "5 seconds") as "group").
  agg(count("value") as "value_count")

import scala.concurrent.duration._
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
val sq = counts.
  writeStream.
  format("console").
  option("truncate", false).
  trigger(Trigger.ProcessingTime(1.hour)).
  outputMode(OutputMode.Complete).
  start

17/08/26 21:16:20 ERROR StreamExecution: Query [id = 
980bfeba-5433-49db-9d20-055c965b25f3, runId = 
b711948f-4def-4205-ac43-ec1e5b852fdb] terminated with error
java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from 
TextSocketSource[host: localhost, port: 9999] did not have isStreaming=true
Project [_1#66 AS value#71]
+- Project [_1#66]
   +- LocalRelation [_1#66, _2#67]

        at scala.Predef$.assert(Predef.scala:170)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$10.apply(StreamExecution.scala:641)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2$$anonfun$apply$10.apply(StreamExecution.scala:636)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at 
scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
        at scala.collection.Iterator$class.foreach(Iterator.scala:893)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
        at 
scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
        at 
org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:636)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$2.apply(StreamExecution.scala:636)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:274)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:635)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(StreamExecution.scala:313)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:301)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$apply$mcZ$sp$1.apply(StreamExecution.scala:301)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:274)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:60)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:301)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:297)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:213)
{code}

The change seems simple and {{TextSocketSource.getBatch}} should be as follows:

{code}
val rdd = sqlContext.sparkContext.parallelize(rawList).map(v => InternalRow(v))
val rawBatch = sqlContext.internalCreateDataFrame(rdd, schema, isStreaming = 
true)
{code}

> Ensure all leaf nodes that are derived from streaming sources have 
> isStreaming=true
> -----------------------------------------------------------------------------------
>
>                 Key: SPARK-21765
>                 URL: https://issues.apache.org/jira/browse/SPARK-21765
>             Project: Spark
>          Issue Type: Improvement
>          Components: SQL, Structured Streaming
>    Affects Versions: 2.2.0
>            Reporter: Jose Torres
>             Fix For: 3.0.0
>
>
> LogicalPlan has an isStreaming bit, but it's incompletely implemented. Some 
> streaming sources don't set the bit, and the bit can sometimes be lost in 
> rewriting. Setting the bit for all plans that are logically streaming will 
> help us simplify the logic around checking query plan validity.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to