Eliaaazzz opened a new pull request, #38723: URL: https://github.com/apache/beam/pull/38723
Adds a Splittable-DoFn wrapper that brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark`` abstractions to the Python SDK and makes them runnable on the portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into ``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)`` dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired by Java's ``Read.UnboundedSourceAsSDFWrapperFn`` -- not a literal port. The streaming-SDF template followed for the process loop / watermark / defer plumbing is ``apache_beam.transforms.periodicsequence``. Status: **draft** -- opening for early feedback from @yhu (mentor) while a design-first thread goes to ``[email protected]`` in parallel. The MVP is deliberately small; see the "Out of scope" list below. addresses #19137 ## What's in this PR Two new files under ``sdks/python/apache_beam/io/``: * ``unbounded_source.py`` (~745 lines incl. docstrings): public ABCs (``CheckpointMark``, ``UnboundedReader``, ``UnboundedSource``, ``ReadFromUnboundedSource``) + SDF wrapper internals (``_UnboundedSourceRestriction``, ``_UnboundedSourceRestrictionCoder``, ``_UnboundedSourceRestrictionTracker``, ``_UnboundedSourceRestrictionProvider``). * ``unbounded_source_test.py`` (~1130 lines): unit + integration coverage. Plus a small change to ``iobase.py`` (~26 lines): * ``Read.expand()`` gains an ``UnboundedSource`` branch (lazy import to break the ``iobase`` <-> ``unbounded_source`` cycle) that delegates to ``ReadFromUnboundedSource``. * ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` + ``IsBounded.UNBOUNDED``. Decoding rides the existing ``PICKLED_SOURCE`` URN registered on ``SourceBase``. ## Correctness highlights * **Watermark on data path** uses ``reader.get_watermark()`` (Java ``Read.java:594`` parity), not the per-record event time. Holder is ``(value, record_ts, source_wm)``; record event time labels the output ``TimestampedValue``, source watermark advances the estimator. * **Restriction has separate channels** for resume (``checkpoint_mark``) and commit hook (``finalization_checkpoint_mark``) so a done primary's finalize callback cannot contaminate the residual's resume state. * **Reader is closed** on every exit path -- EOF and split paths close inside the tracker; ``try_claim`` / ``try_split`` wrap reader-method calls and close before re-raising; the DoFn's ``finally`` provides defense-in-depth for downstream yield exceptions via the SDF wrapper's private chain with an ``isinstance`` guard and a warning log if the chain is ever refactored upstream. * **EOF advances the watermark estimator to ``MAX_TIMESTAMP``** so downstream event-time windows can close (would otherwise hang at the last reported watermark). * **Initial fan-out** via ``UnboundedSource.split(desired_num_splits=20, options)`` -- validates that returned sub-sources are ``UnboundedSource`` instances (raises ``TypeError`` if not, outside the split-refusal ``except``); on split-refusal exceptions, falls back to single-restriction and logs WARNING. * **``default_output_coder`` wired** via ``coders.registry.register_coder`` + ``element_type`` so a custom source-declared coder reaches the output PCollection through Beam's standard registry lookup (parameterised coders still require explicit user registration; logged as a warning). * **``poll_interval_seconds`` validated** to be > 0 in ``ReadFromUnboundedSource.__init__``. ## Test coverage 58 tests, all green locally on Python 3.13 + Beam 2.71: * ``unbounded_source_test.py`` (42): ABC contracts; restriction coder round-trip; restriction tracker state machine (claim / split / EOF / no-data / check_done / progress / is_bounded); finalize idempotency; source-watermark vs. record-timestamp regression; finalize/resume channel separation; tracker-internal exception close on ``reader.advance`` and ``reader.get_watermark`` failures; DoFn generator close path (unit + integration with downstream raising ``Map``); cloudpickle round-trip for transform and source; circular import in three orderings via subprocess + tempfile; e2e DirectRunner pipeline (records in order + windowed GroupByKey). * ``iobase_test.py`` (+3): ``Read(UnboundedSource)`` dispatch through the new ``expand`` branch; ``Read.to_runner_api / from_runner_api`` round-trip with ``IsBounded.UNBOUNDED``; PCollection ``is_bounded`` assertion. ## Out of scope (deferred to W2+, tracked under #19137) Listed exhaustively in the module docstring at ``sdks/python/apache_beam/io/unbounded_source.py``: * Record-id-based deduplication (Java's ``ValueWithRecordId``). * Backlog-byte reporting (``restriction_size`` is constant 1; ``current_progress`` is binary 0.0 / 1.0). * Dynamic split fractions / runner-initiated work stealing. * Source-specific checkpoint coders threaded through the SDF restriction coder (today the coder always pickles checkpoint marks via ``_MemoizingPickleCoder`` regardless of the source's ``get_checkpoint_mark_coder``). * Reader caching across bundles (Java caches readers via a Guava cache; this PoC always rebuilds the reader from the checkpoint). * ``EmptyUnboundedSource`` terminal-state marker (we use an ``is_done`` flag on the restriction instead). * Runner-side ``IsBounded.UNBOUNDED`` dispatch in ``bundle_processor.IMPULSE_READ_TRANSFORM``. Today the wire format round-trips correctly but execution flows through the composite's expanded sub-transforms (``Impulse | Map | SDF-ParDo``), not the URN handler. ------------------------ - [x] Mention the appropriate issue in your description -- ``addresses #19137``. - [ ] Update ``CHANGES.md`` with noteworthy changes -- intentionally deferred until W1/W2 scope is finalised on ``dev@``; will add before the PR moves out of draft. - [ ] ICLA -- to be confirmed by mentor (large contribution under GSoC 2026). cc @yhu for mentor review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
