Hi all,

I'd like to propose adding a Python-native port of the Scala `StreamTest`
framework
(`sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala`)
to PySpark, and invite review of a 4-PR stack that delivers it.

*Why*

Scala's `StreamTest` provides a declarative, action-based testing paradigm
used by ~38 Structured Streaming test suites in the Apache Spark codebase.
PySpark has no equivalent today: streaming tests are written ad-hoc with
manual `writeStream` / `processAllAvailable` / `time.sleep` loops, which
makes them verbose, fragile, and slow to develop. As PySpark streaming
usage grows -- including `transformWithState`, Python data sources, RTM --
the lack of a structured testing framework is increasingly a productivity
tax on PySpark contributors and downstream users writing PySpark streaming
applications.

*What*

A new test-only utility package `pyspark.testing.streaming` plus a small
set of test-only helpers in `PythonSQLUtils.scala`. Tests are written as a
sequence of actions:

  class WordCountTest(StreamTest):



      def test_count(self):



          source = MemoryStream(self.spark, "string")

          counts = source.to_df().groupBy("value").count()



          self.run_stream_test(

              counts,



              AddData(source, "a", "b", "a"),

              CheckAnswer(("a", 2), ("b", 1)),



              output_mode="complete",



          )

The framework supports the full action vocabulary you'd expect from the
Scala equivalent -- `StartStream`, `StopStream`, `AddData`,
`ProcessAllAvailable`, `CheckAnswer` / `CheckLastBatch` / `CheckNewAnswer`
/ `CheckAnswerByFunc`, `Assert` / `AssertOnQuery` / `Execute`,
`ExpectFailure` -- plus Real-Time Mode (`LowLatencyMemoryStream`,
`ContinuousMemorySink`, `Trigger.RealTime`) via polling Check actions.

*How*

The strategy is to keep the JVM streaming runtime as the source of truth
and put a thin Python layer on top of it. Rather than reimplement memory
source and sink in python, the framework wraps the existing JVM
`MemoryStream[Row]` / `MemorySink` / `LowLatencyMemoryStream` /
`ContinuousMemorySink` through a handful of test-only helpers added to
`PythonSQLUtils.scala`, and exposes a Python `unittest.TestCase` subclass
(`StreamTest`) whose `run_stream_test()` driver walks an action list and
dispatches against those helpers. The result is action-for-action parity
with Scala `StreamTest` for the common case, idiomatic Python at the call
site (flexible expected-row shapes, dataclass-style actions, type hints),
and -- because the JVM does the heavy lifting -- minimal risk of behavioral
drift in testing semantics.


*PR stack*

  
┌─────┬────────────────────────────┬────────────────────────────────────────────────────────────────────────┬───────┐


  │  #  │           Branch           │                                 Scope
                                │ Tests │

  
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤


  │ 1   │ streamtest-py-1-bridge     │ JVM bridge + MemoryStream Python
wrapper                               │ 11    │


├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤

  │ 2   │ streamtest-py-2-base       │ StreamTest base +
lifecycle/assertion actions                          │ +12   │


  
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤


  │ 3   │ streamtest-py-3-checks     │ CheckAnswer family + flexible-type
expected values                     │ +26   │


  
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤


  │ 4   │ streamtest-py-4-rtm        │ Real-Time Mode
(LowLatencyMemoryStream, ContinuousMemorySink, polling) │ +13   │

  
├─────┼────────────────────────────┼────────────────────────────────────────────────────────────────────────┼───────┤




The full design doc, including the architecture diagram and usage examples,
lives at:

https://github.com/jerrypeng/spark/blob/stack/streamtest-py-5-design-doc/python/pyspark/testing/streaming/DESIGN.md

*Asks*

I would appreciate feedback on this effort.  I did mostly use claude to
help me do this conversation.

It would be great if I can get reviews on PRs for this starting from the
first PR:

https://github.com/apache/spark/pull/55656

Thanks,
Jerry

Reply via email to