Hi dev@, I'm working on a GSoC 2026 proposal for porting two streaming primitives to the Python SDK (#19137 UnboundedSource, #21521 Watch) and wanted to share my design for community feedback.
## Problem Python SDK lacks: 1. An SDF-based UnboundedSource wrapper (Java has UnboundedSourceAsSDFWrapperFn in Read.java) 2. A Watch transform for polling with dedup (Java has Watch.java, ~1,300 lines) The closest Python workaround is PeriodicImpulse + stateful ParDo, but MatchContinuously's CombiningValueStateSpec dedup doesn't survive pipeline restarts on runners without persistent state backends (DirectRunner), and the watermark is synthetic (wall-clock from PeriodicImpulse, not grounded in actual source data). ## Design Approach **UnboundedSource SDF Wrapper:** - Follows the existing BoundedSource SDF pattern in iobase.py (SDFBoundedSourceReader) - Restriction carries (source, checkpoint, watermark) - try_claim() delegates to reader.start()/advance() - try_split() produces EmptyUnboundedSource primary + checkpoint residual - Checkpoint finalization deferred to bundle commit via BundleFinalizerParam - ManualWatermarkEstimator fed by reader.getWatermark() - ProcessContinuation.resume(resume_delay=...) for runner re-invocation **Watch Transform:** - GrowthState hierarchy: PollingGrowthState (active, carries dedup hash set + termination state) / NonPollingGrowthState (bounded replay after split) - Three-case try_split() matching Java's GrowthTracker semantics - PollResult-grounded watermarks (not wall-clock) - Composable TerminationConditions (AfterTotalOf, AfterTimeSinceNewOutput, AfterIterations) - Content-hash dedup (Murmur3 128-bit for Java compat, SHA-256 fallback for stdlib-only) **Why native Python (not just xlang)?** - Python UnboundedSource subclasses can't run in Java expansion service - PollFn must be Python-serializable, not Java-serializable - Debugging/testing without Java toolchain - No expansion service dependency for pipeline portability ## PoC I've built a working proof of concept validating both designs: https://github.com/PDGGK/beam-python-streaming-poc - 68 tests passing against Beam 2.71 - 4 runnable examples including a restart-resilience demo comparing Watch checkpoint-based dedup vs PeriodicImpulse volatile state - Design document with Java-to-Python component mapping ## Open Questions 1. For the UnboundedSource wrapper, should the busy-poll case (advance() returns False repeatedly) use ProcessContinuation.resume(resume_delay=...) or defer_remainder? Which is honored by portable runners today? 2. For Watch, should the dedup hash use mmh3 (matches Java's Murmur3, requires C extension) or hashlib.sha256 (stdlib only, no external dep)? 3. Is there appetite for refactoring MatchContinuously to use Watch internally (behind a feature flag)? I have 5 merged Beam PRs (#37681 KafkaIO GCS leak, #37458 WriteToKafka headers, #37356 backoff API, #37298 serialization errors, #37297 Ubuntu docs) and have been working with the SDF infrastructure in both Java and Python SDKs. Feedback welcome on the design approach. Best, Zihan Dai GitHub: @PDGGK
