Hi all,
I'd like to propose adding a Python-native port of the Scala `StreamTest`
framework
(`sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala`)
to PySpark, and invite review of a 4-PR stack that delivers it.
*Why*
Scala's `StreamTest` provides a declarative, action-based testing paradigm
used by ~38 Structured Streaming test suites in the Apache Spark codebase.
PySpark has no equivalent today: streaming tests are written ad-hoc with
manual `writeStream` / `processAllAvailable` / `time.sleep` loops, which
makes them verbose, fragile, and slow to develop. As PySpark streaming
usage grows -- including `transformWithState`, Python data sources, RTM --
the lack of a structured testing framework is increasingly a productivity
tax on PySpark contributors and downstream users writing PySpark streaming
applications.
*What*
A new test-only utility package `pyspark.testing.streaming` plus a small
set of test-only helpers in `PythonSQLUtils.scala`. Tests are written as a
sequence of actions:
class WordCountTest(StreamTest):
def test_count(self):
source = MemoryStream(self.spark, "string")
counts = source.to_df().groupBy("value").count()
self.run_stream_test(
counts,
AddData(source, "a", "b", "a"),
CheckAnswer(("a", 2), ("b", 1)),
output_mode="complete",
)
The framework supports the full action vocabulary you'd expect from the
Scala equivalent -- `StartStream`, `StopStream`, `AddData`,
`ProcessAllAvailable`, `CheckAnswer` / `CheckLastBatch` / `CheckNewAnswer`
/ `CheckAnswerByFunc`, `Assert` / `AssertOnQuery` / `Execute`,
`ExpectFailure` -- plus Real-Time Mode (`LowLatencyMemoryStream`,
`ContinuousMemorySink`, `Trigger.RealTime`) via polling Check actions.
*How*
The strategy is to keep the JVM streaming runtime as the source of truth
and put a thin Python layer on top of it. Rather than reimplement memory
source and sink in python, the framework wraps the existing JVM
`MemoryStream[Row]` / `MemorySink` / `LowLatencyMemoryStream` /
`ContinuousMemorySink` through a handful of test-only helpers added to
`PythonSQLUtils.scala`, and exposes a Python `unittest.TestCase` subclass
(`StreamTest`) whose `run_stream_test()` driver walks an action list and
dispatches against those helpers. The result is action-for-action parity
with Scala `StreamTest` for the common case, idiomatic Python at the call
site (flexible expected-row shapes, dataclass-style actions, type hints),
and -- because the JVM does the heavy lifting -- minimal risk of behavioral
drift in testing semantics.
*PR stack*
┌─────┬────────────────────────────┬────────────────────────────────────────────────────────────────────────┬───────┐
│ # │ Branch │ Scope
│ Tests │
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤
│ 1 │ streamtest-py-1-bridge │ JVM bridge + MemoryStream Python
wrapper │ 11 │
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤
│ 2 │ streamtest-py-2-base │ StreamTest base +
lifecycle/assertion actions │ +12 │
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤
│ 3 │ streamtest-py-3-checks │ CheckAnswer family + flexible-type
expected values │ +26 │
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤
│ 4 │ streamtest-py-4-rtm │ Real-Time Mode
(LowLatencyMemoryStream, ContinuousMemorySink, polling) │ +13 │
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤
The full design doc, including the architecture diagram and usage examples,
lives at:
https://github.com/jerrypeng/spark/blob/stack/streamtest-py-5-design-doc/python/pyspark/testing/streaming/DESIGN.md
*Asks*
I would appreciate feedback on this effort. I did mostly use claude to
help me do this conversation.
It would be great if I can get reviews on PRs for this starting from the
first PR:
https://github.com/apache/spark/pull/55656
Thanks,
Jerry