[ https://issues.apache.org/jira/browse/SPARK-24046?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Apache Spark reassigned SPARK-24046: ------------------------------------ Assignee: Apache Spark > Rate Source doesn't gradually increase rate when rampUpTime>=RowsPerSecond > -------------------------------------------------------------------------- > > Key: SPARK-24046 > URL: https://issues.apache.org/jira/browse/SPARK-24046 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Environment: Spark 2.3.0 using Spark Shell on Ubuntu 17.4 > (Environment is not important, the issue lies in the rate calculation) > Reporter: Gerard Maas > Assignee: Apache Spark > Priority: Major > Labels: RateSource > Attachments: image-2018-04-22-22-03-03-945.png, > image-2018-04-22-22-06-49-202.png > > > When using the rate source in Structured streaming, the `rampUpTime` feature > fails to gradually increase the stream rate when the `rampUpTime` option is > equal or greater than `rowsPerSecond`. > When rampUpTime >= rowsPerSecond` all batches at `time < rampUpTime` contain > 0 values. The rate jumps to `rowsPerSecond` when `time>rampUpTime`. > The following scenario, executed in the `spark-shell` demonstrates this issue: > {code:java} > // Using rampUpTime(10) > rowsPerSecond(5) > {code} > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 5) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@cf82c58 > ------------------------------------------- > Batch: 0 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 7 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 8 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 9 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 10 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 11 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 17:08:...| 0| > |2018-04-22 17:08:...| 1| > |2018-04-22 17:08:...| 2| > |2018-04-22 17:08:...| 3| > |2018-04-22 17:08:...| 4| > +--------------------+-----+ > ------------------------------------------- > Batch: 12 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 17:08:...| 5| > |2018-04-22 17:08:...| 6| > |2018-04-22 17:08:...| 7| > |2018-04-22 17:08:...| 8| > |2018-04-22 17:08:...| 9| > +--------------------+-----+ > {code} > > This scenario shows rowsPerSecond == rampUpTime, which also fails > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 10) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@149ef64a > scala> ------------------------------------------- > Batch: 0 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 7 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 8 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 9 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 10 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 11 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:32:...| 0| > |2018-04-22 15:32:...| 1| > |2018-04-22 15:32:...| 2| > |2018-04-22 15:32:...| 3| > |2018-04-22 15:32:...| 4| > |2018-04-22 15:32:...| 5| > |2018-04-22 15:32:...| 6| > |2018-04-22 15:32:...| 7| > |2018-04-22 15:32:...| 8| > |2018-04-22 15:32:...| 9| > +--------------------+-----+ > ------------------------------------------- > Batch: 12 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:32:...| 10| > |2018-04-22 15:32:...| 11| > |2018-04-22 15:32:...| 12| > |2018-04-22 15:32:...| 13| > |2018-04-22 15:32:...| 14| > |2018-04-22 15:32:...| 15| > |2018-04-22 15:32:...| 16| > |2018-04-22 15:32:...| 17| > |2018-04-22 15:32:...| 18| > |2018-04-22 15:32:...| 19| > +--------------------+-----+ > {code} > > In contrast, when `rowsPerSecond > rampUpTime` the gradual increase happens > as expected. > > {code:java} > .option("rowsPerSecond", 11) > .option("rampUpTime", 10){code} > > {code:java} > val stream = spark.readStream > .format("rate") > .option("rowsPerSecond", 11) > .option("rampUpTime", 10) > .load() > val query = stream.writeStream.format("console").start() > // Exiting paste mode, now interpreting. > stream: org.apache.spark.sql.DataFrame = [timestamp: timestamp, value: bigint] > query: org.apache.spark.sql.streaming.StreamingQuery = > org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@19c6e821 > scala> ------------------------------------------- > Batch: 0 > ------------------------------------------- > +---------+-----+ > |timestamp|value| > +---------+-----+ > +---------+-----+ > ------------------------------------------- > Batch: 1 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 0| > +--------------------+-----+ > ------------------------------------------- > Batch: 2 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 1| > |2018-04-22 15:34:...| 2| > +--------------------+-----+ > ------------------------------------------- > Batch: 3 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 3| > |2018-04-22 15:34:...| 4| > |2018-04-22 15:34:...| 5| > +--------------------+-----+ > ------------------------------------------- > Batch: 4 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 6| > |2018-04-22 15:34:...| 7| > |2018-04-22 15:34:...| 8| > |2018-04-22 15:34:...| 9| > +--------------------+-----+ > ------------------------------------------- > Batch: 5 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 10| > |2018-04-22 15:34:...| 11| > |2018-04-22 15:34:...| 12| > |2018-04-22 15:34:...| 13| > |2018-04-22 15:34:...| 14| > +--------------------+-----+ > ------------------------------------------- > Batch: 6 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 15| > |2018-04-22 15:34:...| 16| > |2018-04-22 15:34:...| 17| > |2018-04-22 15:34:...| 18| > |2018-04-22 15:34:...| 19| > |2018-04-22 15:34:...| 20| > +--------------------+-----+ > ------------------------------------------- > Batch: 7 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 21| > |2018-04-22 15:34:...| 22| > |2018-04-22 15:34:...| 23| > |2018-04-22 15:34:...| 24| > |2018-04-22 15:34:...| 25| > |2018-04-22 15:34:...| 26| > |2018-04-22 15:34:...| 27| > +--------------------+-----+ > ------------------------------------------- > Batch: 8 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 28| > |2018-04-22 15:34:...| 29| > |2018-04-22 15:34:...| 30| > |2018-04-22 15:34:...| 31| > |2018-04-22 15:34:...| 32| > |2018-04-22 15:34:...| 33| > |2018-04-22 15:34:...| 34| > |2018-04-22 15:34:...| 35| > +--------------------+-----+ > ------------------------------------------- > Batch: 9 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 36| > |2018-04-22 15:34:...| 37| > |2018-04-22 15:34:...| 38| > |2018-04-22 15:34:...| 39| > |2018-04-22 15:34:...| 40| > |2018-04-22 15:34:...| 41| > |2018-04-22 15:34:...| 42| > |2018-04-22 15:34:...| 43| > |2018-04-22 15:34:...| 44| > +--------------------+-----+ > ------------------------------------------- > Batch: 10 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 45| > |2018-04-22 15:34:...| 46| > |2018-04-22 15:34:...| 47| > |2018-04-22 15:34:...| 48| > |2018-04-22 15:34:...| 49| > |2018-04-22 15:34:...| 50| > |2018-04-22 15:34:...| 51| > |2018-04-22 15:34:...| 52| > |2018-04-22 15:34:...| 53| > |2018-04-22 15:34:...| 54| > +--------------------+-----+ > ------------------------------------------- > Batch: 11 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 55| > |2018-04-22 15:34:...| 56| > |2018-04-22 15:34:...| 57| > |2018-04-22 15:34:...| 58| > |2018-04-22 15:34:...| 59| > |2018-04-22 15:34:...| 60| > |2018-04-22 15:34:...| 61| > |2018-04-22 15:34:...| 62| > |2018-04-22 15:34:...| 63| > |2018-04-22 15:34:...| 64| > |2018-04-22 15:34:...| 65| > +--------------------+-----+ > ------------------------------------------- > Batch: 12 > ------------------------------------------- > +--------------------+-----+ > | timestamp|value| > +--------------------+-----+ > |2018-04-22 15:34:...| 66| > |2018-04-22 15:34:...| 67| > |2018-04-22 15:34:...| 68| > |2018-04-22 15:34:...| 69| > |2018-04-22 15:34:...| 70| > |2018-04-22 15:34:...| 71| > |2018-04-22 15:34:...| 72| > |2018-04-22 15:34:...| 73| > |2018-04-22 15:34:...| 74| > |2018-04-22 15:34:...| 75| > |2018-04-22 15:34:...| 76| > +--------------------+-----+ > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org