This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 3e8343205c4d [SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests
for `transformWithState` operator
3e8343205c4d is described below
commit 3e8343205c4d434076c013acd14cbfd8736241d4
Author: jingz-db <[email protected]>
AuthorDate: Tue Mar 26 18:24:13 2024 +0900
[SPARK-47469][SS][TESTS] Add `Trigger.AvailableNow` tests for
`transformWithState` operator
### What changes were proposed in this pull request?
Add tests for AvailableNow for TransformWithState operator.
### Why are the changes needed?
Compliance with state-v2 test plan.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Added unit test suites.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #45596 from jingz-db/avaiNow-tests-state-v2.
Authored-by: jingz-db <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
---
.../sql/streaming/TransformWithStateSuite.scala | 101 ++++++++++++++++++++-
1 file changed, 100 insertions(+), 1 deletion(-)
diff --git
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 0fd2ef055ffc..24b0d59c45c5 100644
---
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -17,9 +17,13 @@
package org.apache.spark.sql.streaming
+import java.io.File
+import java.util.UUID
+
import org.apache.spark.SparkRuntimeException
import org.apache.spark.internal.Logging
-import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.{Dataset, Encoders}
+import org.apache.spark.sql.catalyst.util.stringToFile
import org.apache.spark.sql.execution.streaming._
import
org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled,
RocksDBStateStoreProvider,
StatefulProcessorCannotPerformOperationWithInvalidHandleState,
StateStoreMultipleColumnFamiliesNotSupportedException}
import org.apache.spark.sql.functions.timestamp_seconds
@@ -650,6 +654,101 @@ class TransformWithStateSuite extends
StateStoreMetricsTest
)
}
}
+
+ /** Create a text file with a single data item */
+ private def createFile(data: String, srcDir: File): File =
+ stringToFile(new File(srcDir, s"${UUID.randomUUID()}.txt"), data)
+
+ private def createFileStream(srcDir: File): Dataset[(String, String)] = {
+ spark
+ .readStream
+ .option("maxFilesPerTrigger", "1")
+ .text(srcDir.getCanonicalPath)
+ .select("value").as[String]
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessor(),
+ TimeoutMode.NoTimeouts(),
+ OutputMode.Update())
+ }
+
+ test("transformWithState - availableNow trigger mode, rate limit is
respected") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName) {
+ withTempDir { srcDir =>
+
+ Seq("a", "b", "c").foreach(createFile(_, srcDir))
+
+ // Set up a query to read text files one at a time
+ val df = createFileStream(srcDir)
+
+ testStream(df)(
+ StartStream(trigger = Trigger.AvailableNow()),
+ ProcessAllAvailable(),
+ CheckNewAnswer(("a", "1"), ("b", "1"), ("c", "1")),
+ StopStream,
+ Execute { _ =>
+ createFile("a", srcDir)
+ },
+ StartStream(trigger = Trigger.AvailableNow()),
+ ProcessAllAvailable(),
+ CheckNewAnswer(("a", "2"))
+ )
+
+ var index = 0
+ val foreachBatchDf = df.writeStream
+ .foreachBatch((_: Dataset[(String, String)], _: Long) => {
+ index += 1
+ })
+ .trigger(Trigger.AvailableNow())
+ .start()
+
+ try {
+ foreachBatchDf.awaitTermination()
+ assert(index == 4)
+ } finally {
+ foreachBatchDf.stop()
+ }
+ }
+ }
+ }
+
+ test("transformWithState - availableNow trigger mode, multiple restarts") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName) {
+ withTempDir { srcDir =>
+ Seq("a", "b", "c").foreach(createFile(_, srcDir))
+ val df = createFileStream(srcDir)
+
+ var index = 0
+
+ def startTriggerAvailableNowQueryAndCheck(expectedIdx: Int): Unit = {
+ val q = df.writeStream
+ .foreachBatch((_: Dataset[(String, String)], _: Long) => {
+ index += 1
+ })
+ .trigger(Trigger.AvailableNow)
+ .start()
+ try {
+ assert(q.awaitTermination(streamingTimeout.toMillis))
+ assert(index == expectedIdx)
+ } finally {
+ q.stop()
+ }
+ }
+ // start query for the first time
+ startTriggerAvailableNowQueryAndCheck(3)
+
+ // add two files and restart
+ createFile("a", srcDir)
+ createFile("b", srcDir)
+ startTriggerAvailableNowQueryAndCheck(8)
+
+ // try restart again
+ createFile("d", srcDir)
+ startTriggerAvailableNowQueryAndCheck(14)
+ }
+ }
+ }
}
class TransformWithStateValidationSuite extends StateStoreMetricsTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]