Hi dev@,

I'm working on a GSoC 2026 proposal for porting two streaming
primitives to the Python SDK (#19137 UnboundedSource, #21521 Watch)
and wanted to share my design for community feedback.

## Problem

Python SDK lacks:
1. An SDF-based UnboundedSource wrapper (Java has
UnboundedSourceAsSDFWrapperFn in Read.java)
2. A Watch transform for polling with dedup (Java has Watch.java, ~1,300 lines)

The closest Python workaround is PeriodicImpulse + stateful ParDo, but
MatchContinuously's CombiningValueStateSpec dedup doesn't survive
pipeline restarts on runners without persistent state backends
(DirectRunner), and the watermark is synthetic (wall-clock from
PeriodicImpulse, not grounded in actual source data).

## Design Approach

**UnboundedSource SDF Wrapper:**
- Follows the existing BoundedSource SDF pattern in iobase.py
(SDFBoundedSourceReader)
- Restriction carries (source, checkpoint, watermark)
- try_claim() delegates to reader.start()/advance()
- try_split() produces EmptyUnboundedSource primary + checkpoint residual
- Checkpoint finalization deferred to bundle commit via BundleFinalizerParam
- ManualWatermarkEstimator fed by reader.getWatermark()
- ProcessContinuation.resume(resume_delay=...) for runner re-invocation

**Watch Transform:**
- GrowthState hierarchy: PollingGrowthState (active, carries dedup
hash set + termination state) / NonPollingGrowthState (bounded replay
after split)
- Three-case try_split() matching Java's GrowthTracker semantics
- PollResult-grounded watermarks (not wall-clock)
- Composable TerminationConditions (AfterTotalOf,
AfterTimeSinceNewOutput, AfterIterations)
- Content-hash dedup (Murmur3 128-bit for Java compat, SHA-256
fallback for stdlib-only)

**Why native Python (not just xlang)?**
- Python UnboundedSource subclasses can't run in Java expansion service
- PollFn must be Python-serializable, not Java-serializable
- Debugging/testing without Java toolchain
- No expansion service dependency for pipeline portability

## PoC

I've built a working proof of concept validating both designs:
https://github.com/PDGGK/beam-python-streaming-poc

- 68 tests passing against Beam 2.71
- 4 runnable examples including a restart-resilience demo comparing
Watch checkpoint-based dedup vs PeriodicImpulse volatile state
- Design document with Java-to-Python component mapping

## Open Questions

1. For the UnboundedSource wrapper, should the busy-poll case
(advance() returns False repeatedly) use
ProcessContinuation.resume(resume_delay=...) or defer_remainder? Which
is honored by portable runners today?

2. For Watch, should the dedup hash use mmh3 (matches Java's Murmur3,
requires C extension) or hashlib.sha256 (stdlib only, no external
dep)?

3. Is there appetite for refactoring MatchContinuously to use Watch
internally (behind a feature flag)?

I have 5 merged Beam PRs (#37681 KafkaIO GCS leak, #37458 WriteToKafka
headers, #37356 backoff API, #37298 serialization errors, #37297
Ubuntu docs) and have been working with the SDF infrastructure in both
Java and Python SDKs.

Feedback welcome on the design approach.

Best,
Zihan Dai
GitHub: @PDGGK

Reply via email to