This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 70fde44  [SPARK-37062][SS] Introduce a new data source for providing 
consistent set of rows per microbatch
70fde44 is described below

commit 70fde44e930926cbcd1fc95fa7cfb915c25cff9c
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Mon Nov 1 20:04:10 2021 +0900

    [SPARK-37062][SS] Introduce a new data source for providing consistent set 
of rows per microbatch
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to introduce a new data source having short name as 
"rate-micro-batch", which produces similar input rows as "rate" (increment long 
values with timestamps), but ensures that each micro-batch has a "predictable" 
set of input rows.
    
    "rate-micro-batch" data source receives a config to specify the number of 
rows per micro-batch, which defines the set of input rows for further 
micro-batches. For example, if the number of rows per micro-batch is set to 
1000, the first batch would have 1000 rows having value range as `0~999`, the 
second batch would have 1000 rows having value range as `1000~1999`, and so on. 
This characteristic brings different use cases compared to rate data source, as 
we can't predict the input rows [...]
    
    For generated time (timestamp column), the data source applies the same 
mechanism to make the value of column be predictable. `startTimestamp` option 
defines the starting value of generated time, and `advanceMillisPerBatch` 
option defines how much time the generated time should advance per micro-batch. 
All input rows in the same micro-batch will have same timestamp.
    
    This source supports the following options:
    
    * `rowsPerBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
    * `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the generated rows.
    * `startTimestamp` (e.g. 1000, default: 0): starting value of generated time
    * `advanceMillisPerBatch` (e.g. 1000, default: 1000): the amount of time 
being advanced in generated time on each micro-batch.
    
    ### Why are the changes needed?
    
    The "rate" data source has been known to be used as a benchmark for 
streaming query.
    
    While this helps to put the query to the limit (how many rows the query 
could process per second), the rate data source doesn't provide consistent rows 
per batch into stream, which leads two environments be hard to compare with.
    
    For example, in many cases, you may want to compare the metrics in the 
batches between test environments (like running same streaming query with 
different options). These metrics are strongly affected if the distribution of 
input rows in batches are changing, especially a micro-batch has been lagged 
(in any reason) and rate data source produces more input rows to the next batch.
    
    Also, when you test against streaming aggregation, you may want the data 
source produces the same set of input rows per batch (deterministic), so that 
you can plan how these input rows will be aggregated and how state rows will be 
evicted, and craft the test query based on the plan.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, end users can leverage a new data source in micro-batch mode of 
streaming query to test/benchmark.
    
    ### How was this patch tested?
    
    New UTs, and manually tested via below query in spark-shell:
    
    ```
    spark.readStream.format("rate-micro-batch").option("rowsPerBatch", 
20).option("numPartitions", 3).load().writeStream.format("console").start()
    ```
    
    Closes #34333 from HeartSaVioR/SPARK-37062.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 docs/structured-streaming-programming-guide.md     |  13 ++
 ...org.apache.spark.sql.sources.DataSourceRegister |   1 +
 .../sources/RatePerMicroBatchProvider.scala        | 127 +++++++++++++
 .../sources/RatePerMicroBatchStream.scala          | 175 ++++++++++++++++++
 .../sources/RatePerMicroBatchProviderSuite.scala   | 204 +++++++++++++++++++++
 5 files changed, 520 insertions(+)

diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index b36cdc7..6237d47 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -517,6 +517,8 @@ There are a few built-in sources.
 
   - **Rate source (for testing)** - Generates data at the specified number of 
rows per second, each output row contains a `timestamp` and `value`. Where 
`timestamp` is a `Timestamp` type containing the time of message dispatch, and 
`value` is of `Long` type containing the message count, starting from 0 as the 
first row. This source is intended for testing and benchmarking.
 
+  - **Rate Per Micro-Batch source (for testing)** - Generates data at the 
specified number of rows per micro-batch, each output row contains a 
`timestamp` and `value`. Where `timestamp` is a `Timestamp` type containing the 
time of message dispatch, and `value` is of `Long` type containing the message 
count, starting from 0 as the first row. Unlike `rate` data source, this data 
source provides a consistent set of input rows per micro-batch regardless of 
query execution (configuration of t [...]
+
 Some sources are not fault-tolerant because they do not guarantee that data 
can be replayed using 
 checkpointed offsets after a failure. See the earlier section on 
 [fault-tolerance semantics](#fault-tolerance-semantics).
@@ -588,6 +590,17 @@ Here are the details of all the sources in Spark.
     <td>Yes</td>
     <td></td>
   </tr>
+  <tr>
+    <td><b>Rate Per Micro-Batch Source</b> (format: 
<b>rate-micro-batch</b>)</td>
+    <td>
+        <code>rowsPerBatch</code> (e.g. 100): How many rows should be 
generated per micro-batch.<br/><br/>
+        <code>numPartitions</code> (e.g. 10, default: Spark's default 
parallelism): The partition number for the generated rows. <br/><br/>
+        <code>startTimestamp</code> (e.g. 1000, default: 0): starting value of 
generated time. <br/><br/>
+        <code>advanceMillisPerBatch</code> (e.g. 1000, default: 1000): the 
amount of time being advanced in generated time on each micro-batch. <br/><br/>
+    </td>
+    <td>Yes</td>
+    <td></td>
+  </tr>
 
   <tr>
     <td><b>Kafka Source</b></td>
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index c0b8b27..fe4554a9 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -9,3 +9,4 @@ org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.sources.RateStreamProvider
 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider
 org.apache.spark.sql.execution.datasources.binaryfile.BinaryFileFormat
+org.apache.spark.sql.execution.streaming.sources.RatePerMicroBatchProvider
\ No newline at end of file
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
new file mode 100644
index 0000000..acb565e1
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProvider.scala
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import java.util
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table, 
TableCapability}
+import org.apache.spark.sql.connector.read.{Scan, ScanBuilder}
+import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, 
MicroBatchStream}
+import org.apache.spark.sql.internal.connector.SimpleTableProvider
+import org.apache.spark.sql.sources.DataSourceRegister
+import org.apache.spark.sql.types.{LongType, StructField, StructType, 
TimestampType}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+/**
+ *  A source that generates increment long values with timestamps. Each 
generated row has two
+ *  columns: a timestamp column for the generated time and an auto increment 
long column starting
+ *  with 0L.
+ *
+ *  This source supports the following options:
+ *  - `rowsPerMicroBatch` (e.g. 100): How many rows should be generated per 
micro-batch.
+ *  - `numPartitions` (e.g. 10, default: Spark's default parallelism): The 
partition number for the
+ *    generated rows.
+ *  - `startTimestamp` (e.g. 1000, default: 0): starting value of generated 
time
+ *  - `advanceMillisPerMicroBatch` (e.g. 1000, default: 1000): the amount of 
time being advanced in
+ *    generated time on each micro-batch.
+ *
+ *  Unlike `rate` data source, this data source provides a consistent set of 
input rows per
+ *  micro-batch regardless of query execution (configuration of trigger, query 
being lagging, etc.),
+ *  say, batch 0 will produce 0~999 and batch 1 will produce 1000~1999, and so 
on. Same applies to
+ *  the generated time.
+ *
+ *  As the name represents, this data source only supports micro-batch read.
+ */
+class RatePerMicroBatchProvider extends SimpleTableProvider with 
DataSourceRegister {
+  import RatePerMicroBatchProvider._
+
+  override def getTable(options: CaseInsensitiveStringMap): Table = {
+    val rowsPerBatch = options.getLong(ROWS_PER_BATCH, 0)
+    if (rowsPerBatch <= 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '$rowsPerBatch'. The option 'rowsPerBatch' must be 
positive")
+    }
+
+    val numPartitions = options.getInt(
+      NUM_PARTITIONS, SparkSession.active.sparkContext.defaultParallelism)
+    if (numPartitions <= 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '$numPartitions'. The option 'numPartitions' must be 
positive")
+    }
+
+    val startTimestamp = options.getLong(START_TIMESTAMP, 0)
+    if (startTimestamp < 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '$startTimestamp'. The option 'startTimestamp' must be 
non-negative")
+    }
+
+    val advanceMillisPerBatch = options.getInt(ADVANCE_MILLIS_PER_BATCH, 1000)
+    if (advanceMillisPerBatch < 0) {
+      throw new IllegalArgumentException(
+        s"Invalid value '$advanceMillisPerBatch'. The option 
'advanceMillisPerBatch' " +
+          "must be non-negative")
+    }
+
+    new RatePerMicroBatchTable(rowsPerBatch, numPartitions, startTimestamp,
+      advanceMillisPerBatch)
+  }
+
+  override def shortName(): String = "rate-micro-batch"
+}
+
+class RatePerMicroBatchTable(
+    rowsPerBatch: Long,
+    numPartitions: Int,
+    startTimestamp: Long,
+    advanceMillisPerBatch: Int) extends Table with SupportsRead {
+  override def name(): String = {
+    s"RatePerMicroBatch(rowsPerBatch=$rowsPerBatch, 
numPartitions=$numPartitions," +
+      s"startTimestamp=$startTimestamp, 
advanceMillisPerBatch=$advanceMillisPerBatch)"
+  }
+
+  override def schema(): StructType = RatePerMicroBatchProvider.SCHEMA
+
+  override def capabilities(): util.Set[TableCapability] = {
+    util.EnumSet.of(TableCapability.MICRO_BATCH_READ)
+  }
+
+  override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder 
= () => new Scan {
+    override def readSchema(): StructType = RatePerMicroBatchProvider.SCHEMA
+
+    override def toMicroBatchStream(checkpointLocation: String): 
MicroBatchStream =
+      new RatePerMicroBatchStream(rowsPerBatch, numPartitions, startTimestamp,
+        advanceMillisPerBatch, options)
+
+    override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = {
+      throw new UnsupportedOperationException("continuous mode is not 
supported!")
+    }
+  }
+}
+
+object RatePerMicroBatchProvider {
+  val SCHEMA =
+    StructType(StructField("timestamp", TimestampType) :: StructField("value", 
LongType) :: Nil)
+
+  val VERSION = 1
+
+  val NUM_PARTITIONS = "numPartitions"
+  val ROWS_PER_BATCH = "rowsPerBatch"
+  val START_TIMESTAMP = "startTimestamp"
+  val ADVANCE_MILLIS_PER_BATCH = "advanceMillisPerBatch"
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
new file mode 100644
index 0000000..6954e45
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchStream.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.json4s.NoTypeHints
+import org.json4s.jackson.Serialization
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader, 
PartitionReaderFactory}
+import org.apache.spark.sql.connector.read.streaming.{MicroBatchStream, 
Offset, ReadLimit, SupportsTriggerAvailableNow}
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+class RatePerMicroBatchStream(
+    rowsPerBatch: Long,
+    numPartitions: Int,
+    startTimestamp: Long,
+    advanceMsPerBatch: Int,
+    options: CaseInsensitiveStringMap)
+  extends SupportsTriggerAvailableNow with MicroBatchStream with Logging {
+
+  override def initialOffset(): Offset = RatePerMicroBatchStreamOffset(0L, 
startTimestamp)
+
+  override def latestOffset(): Offset = {
+    throw new UnsupportedOperationException(
+      "latestOffset(Offset, ReadLimit) should be called instead of this 
method")
+  }
+
+  override def getDefaultReadLimit: ReadLimit = {
+    ReadLimit.maxRows(rowsPerBatch)
+  }
+
+  private def extractOffsetAndTimestamp(offset: Offset): (Long, Long) = {
+    offset match {
+      case o: RatePerMicroBatchStreamOffset => (o.offset, o.timestamp)
+      case _ => throw new IllegalStateException("The type of Offset should be 
" +
+        "RatePerMicroBatchStreamOffset")
+    }
+  }
+
+  private var isTriggerAvailableNow: Boolean = false
+  private var offsetForTriggerAvailableNow: Offset = _
+
+  override def prepareForTriggerAvailableNow(): Unit = {
+    isTriggerAvailableNow = true
+  }
+
+  override def latestOffset(startOffset: Offset, limit: ReadLimit): Offset = {
+    def calculateNextOffset(start: Offset): Offset = {
+      val (startOffsetLong, timestampAtStartOffset) = 
extractOffsetAndTimestamp(start)
+      // NOTE: This means the data source will provide a set of inputs for 
"single" batch if
+      // the trigger is Trigger.Once.
+      val numRows = rowsPerBatch
+
+      val endOffsetLong = Math.min(startOffsetLong + numRows, Long.MaxValue)
+      val endOffset = RatePerMicroBatchStreamOffset(endOffsetLong,
+        timestampAtStartOffset + advanceMsPerBatch)
+
+      endOffset
+    }
+
+    if (isTriggerAvailableNow) {
+      if (offsetForTriggerAvailableNow == null) {
+        offsetForTriggerAvailableNow = calculateNextOffset(startOffset)
+      }
+
+      offsetForTriggerAvailableNow
+    } else {
+      calculateNextOffset(startOffset)
+    }
+  }
+
+  override def deserializeOffset(json: String): Offset = {
+    RatePerMicroBatchStreamOffset.apply(json)
+  }
+
+  override def planInputPartitions(start: Offset, end: Offset): 
Array[InputPartition] = {
+    val (startOffset, startTimestamp) = extractOffsetAndTimestamp(start)
+    val (endOffset, endTimestamp) = extractOffsetAndTimestamp(end)
+
+    assert(startOffset <= endOffset, s"startOffset($startOffset) > 
endOffset($endOffset)")
+    assert(startTimestamp <= endTimestamp,
+      s"startTimestamp($startTimestamp) > endTimestamp($endTimestamp)")
+    logDebug(s"startOffset: $startOffset, startTimestamp: $startTimestamp, " +
+      s"endOffset: $endOffset, endTimestamp: $endTimestamp")
+
+    if (startOffset == endOffset) {
+      Array.empty
+    } else {
+      (0 until numPartitions).map { p =>
+        RatePerMicroBatchStreamInputPartition(p, numPartitions, startOffset,
+          startTimestamp, endOffset, endTimestamp)
+      }.toArray
+    }
+  }
+
+  override def createReaderFactory(): PartitionReaderFactory = {
+    RatePerMicroBatchStreamReaderFactory
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def stop(): Unit = {}
+
+  override def toString: String = 
s"RatePerMicroBatchStream[rowsPerBatch=$rowsPerBatch, " +
+    s"numPartitions=$numPartitions, startTimestamp=$startTimestamp, " +
+    s"advanceMsPerBatch=$advanceMsPerBatch]"
+}
+
+case class RatePerMicroBatchStreamOffset(offset: Long, timestamp: Long) 
extends Offset {
+  override def json(): String = {
+    Serialization.write(this)(RatePerMicroBatchStreamOffset.formats)
+  }
+}
+
+object RatePerMicroBatchStreamOffset {
+  implicit val formats = Serialization.formats(NoTypeHints)
+
+  def apply(json: String): RatePerMicroBatchStreamOffset =
+    Serialization.read[RatePerMicroBatchStreamOffset](json)
+}
+
+case class RatePerMicroBatchStreamInputPartition(
+    partitionId: Int,
+    numPartitions: Int,
+    startOffset: Long,
+    startTimestamp: Long,
+    endOffset: Long,
+    endTimestamp: Long) extends InputPartition
+
+object RatePerMicroBatchStreamReaderFactory extends PartitionReaderFactory {
+  override def createReader(partition: InputPartition): 
PartitionReader[InternalRow] = {
+    val p = partition.asInstanceOf[RatePerMicroBatchStreamInputPartition]
+    new RatePerMicroBatchStreamPartitionReader(p.partitionId, p.numPartitions,
+      p.startOffset, p.startTimestamp, p.endOffset)
+  }
+}
+
+class RatePerMicroBatchStreamPartitionReader(
+    partitionId: Int,
+    numPartitions: Int,
+    startOffset: Long,
+    startTimestamp: Long,
+    endOffset: Long) extends PartitionReader[InternalRow] {
+  private var count: Long = 0
+
+  override def next(): Boolean = {
+    startOffset + partitionId + numPartitions * count < endOffset
+  }
+
+  override def get(): InternalRow = {
+    val currValue = startOffset + partitionId + numPartitions * count
+    count += 1
+
+    InternalRow(DateTimeUtils.millisToMicros(startTimestamp), currValue)
+  }
+
+  override def close(): Unit = {}
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
new file mode 100644
index 0000000..449aea8
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+
+import org.apache.spark.sql.execution.datasources.DataSource
+import org.apache.spark.sql.functions.spark_partition_id
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.streaming.util.StreamManualClock
+
+class RatePerMicroBatchProviderSuite extends StreamTest {
+
+  import testImplicits._
+
+  test("RatePerMicroBatchProvider in registry") {
+    val ds = DataSource.lookupDataSource("rate-micro-batch", 
spark.sqlContext.conf).newInstance()
+    assert(ds.isInstanceOf[RatePerMicroBatchProvider], "Could not find 
rate-micro-batch source")
+  }
+
+  test("basic") {
+    val input = spark.readStream
+      .format("rate-micro-batch")
+      .option("rowsPerBatch", "10")
+      .option("startTimestamp", "1000")
+      .option("advanceMillisPerBatch", "50")
+      .load()
+    val clock = new StreamManualClock
+    testStream(input)(
+      StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+      waitUntilBatchProcessed(clock),
+      CheckLastBatch((0 until 10).map(v => new java.sql.Timestamp(1000L) -> 
v): _*),
+      AdvanceManualClock(10),
+      waitUntilBatchProcessed(clock),
+      CheckLastBatch((10 until 20).map(v => new java.sql.Timestamp(1050L) -> 
v): _*),
+      AdvanceManualClock(10),
+      waitUntilBatchProcessed(clock),
+      CheckLastBatch((20 until 30).map(v => new java.sql.Timestamp(1100L) -> 
v): _*)
+    )
+  }
+
+  test("restart") {
+    withTempDir { dir =>
+      val input = spark.readStream
+        .format("rate-micro-batch")
+        .option("rowsPerBatch", "10")
+        .load()
+        .select('value)
+
+      val clock = new StreamManualClock
+      testStream(input)(
+        StartStream(checkpointLocation = dir.getAbsolutePath,
+          trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(0 until 10: _*),
+        AdvanceManualClock(10),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(10 until 20: _*),
+        StopStream
+      )
+
+      testStream(input)(
+        StartStream(checkpointLocation = dir.getAbsolutePath,
+          trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(20 until 30: _*)
+      )
+    }
+  }
+
+  test("Trigger.Once") {
+    testTrigger(Trigger.Once())
+  }
+
+  test("Trigger.AvailableNow") {
+    testTrigger(Trigger.AvailableNow())
+  }
+
+  private def testTrigger(triggerToTest: Trigger): Unit = {
+    withTempDir { dir =>
+      val input = spark.readStream
+        .format("rate-micro-batch")
+        .option("rowsPerBatch", "10")
+        .load()
+        .select('value)
+
+      val clock = new StreamManualClock
+      testStream(input)(
+        StartStream(checkpointLocation = dir.getAbsolutePath,
+          trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(0 until 10: _*),
+        StopStream
+      )
+
+      testStream(input)(
+        StartStream(checkpointLocation = dir.getAbsolutePath, trigger = 
triggerToTest,
+          triggerClock = clock),
+        ProcessAllAvailable(),
+        CheckLastBatch(10 until 20: _*),
+        StopStream
+      )
+
+      testStream(input)(
+        StartStream(checkpointLocation = dir.getAbsolutePath,
+          trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+        waitUntilBatchProcessed(clock),
+        CheckLastBatch(20 until 30: _*),
+        StopStream
+      )
+    }
+  }
+
+  private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
+    eventually(Timeout(streamingTimeout)) {
+      if (!q.exception.isDefined) {
+        assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
+      }
+    }
+    if (q.exception.isDefined) {
+      throw q.exception.get
+    }
+    true
+  }
+
+  test("numPartitions") {
+    val input = spark.readStream
+      .format("rate-micro-batch")
+      .option("rowsPerBatch", "10")
+      .option("numPartitions", "6")
+      .load()
+      .select(spark_partition_id())
+      .distinct()
+    val clock = new StreamManualClock
+    testStream(input)(
+      StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
+      waitUntilBatchProcessed(clock),
+      CheckLastBatch(0 until 6: _*)
+    )
+  }
+
+  testQuietly("illegal option values") {
+    def testIllegalOptionValue(
+        option: String,
+        value: String,
+        expectedMessages: Seq[String]): Unit = {
+      val e = intercept[IllegalArgumentException] {
+        var stream = spark.readStream
+          .format("rate-micro-batch")
+          .option(option, value)
+
+        if (option != "rowsPerBatch") {
+          stream = stream.option("rowsPerBatch", "1")
+        }
+
+        stream.load()
+          .writeStream
+          .format("console")
+          .start()
+          .awaitTermination()
+      }
+      for (msg <- expectedMessages) {
+        assert(e.getMessage.contains(msg))
+      }
+    }
+
+    testIllegalOptionValue("rowsPerBatch", "-1", Seq("-1", "rowsPerBatch", 
"positive"))
+    testIllegalOptionValue("rowsPerBatch", "0", Seq("0", "rowsPerBatch", 
"positive"))
+    testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", 
"positive"))
+    testIllegalOptionValue("numPartitions", "0", Seq("0", "numPartitions", 
"positive"))
+
+    // RatePerMicroBatchProvider allows setting below options to 0
+    testIllegalOptionValue("advanceMillisPerBatch", "-1",
+      Seq("-1", "advanceMillisPerBatch", "non-negative"))
+    testIllegalOptionValue("startTimestamp", "-1", Seq("-1", "startTimestamp", 
"non-negative"))
+  }
+
+  test("user-specified schema given") {
+    val exception = intercept[UnsupportedOperationException] {
+      spark.readStream
+        .format("rate-micro-batch")
+        .option("rowsPerBatch", "10")
+        .schema(spark.range(1).schema)
+        .load()
+    }
+    assert(exception.getMessage.contains(
+      "RatePerMicroBatchProvider source does not support user-specified 
schema"))
+  }
+}

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to