Eliaaazzz opened a new pull request, #38724:
URL: https://github.com/apache/beam/pull/38724

   Brings Java's ``UnboundedSource`` / ``UnboundedReader`` / ``CheckpointMark``
   abstractions to the Python SDK as a Splittable-DoFn wrapper runnable on the
   portable Fn API (DirectRunner / FnApiRunner). Wires the new source type into
   ``iobase.Read.expand()`` so ``p | beam.io.Read(my_unbounded_source)``
   dispatches alongside the existing ``BoundedSource`` branch. Loosely inspired
   by Java's ``Read.UnboundedSourceAsSDFWrapperFn``; the streaming-SDF template
   followed for the process loop / watermark / defer plumbing is
   ``apache_beam.transforms.periodicsequence``.
   
   addresses #19137
   
   ## What's in this PR
   
   Two new files under ``sdks/python/apache_beam/io/``:
   
   * ``unbounded_source.py``: public ABCs (``CheckpointMark``,
     ``UnboundedReader``, ``UnboundedSource``, ``ReadFromUnboundedSource``)
     plus SDF wrapper internals (``_UnboundedSourceRestriction``,
     ``_UnboundedSourceRestrictionCoder``,
     ``_UnboundedSourceRestrictionTracker``,
     ``_UnboundedSourceRestrictionProvider``).
   * ``unbounded_source_test.py``: 42 deterministic unit + integration tests.
   
   Plus a small change to ``iobase.py``:
   
   * ``Read.expand`` gains an ``UnboundedSource`` branch with a function-local
     lazy import to break the ``iobase`` <-> ``unbounded_source`` cycle.
   * ``Read.to_runner_api_parameter`` widens the source ``isinstance`` to
     ``(BoundedSource, UnboundedSource)``, writing ``READ.urn`` +
     ``ReadPayload(is_bounded=UNBOUNDED)``. Decode rides the existing
     ``PICKLED_SOURCE`` URN registered on ``SourceBase``. Runner-side
     ``IsBounded.UNBOUNDED`` dispatch in
     ``bundle_processor.IMPULSE_READ_TRANSFORM`` is W2 work; today execution
     flows through the composite's expanded ``Impulse | Map | SDF-ParDo``.
   
   ## Correctness highlights
   
   * **Data-path watermark** uses ``reader.get_watermark()`` (Java
     ``Read.java:594`` parity), not the per-record event time. Holder is
     ``(value, record_ts, source_wm)``; the record event time labels the
     emitted ``TimestampedValue``, the source watermark advances the estimator.
   * **Restriction has separate channels** for resume (``checkpoint_mark``)
     and commit hook (``finalization_checkpoint_mark``) so a done primary's
     finalize callback cannot contaminate the residual's resume state.
   * **Reader is closed on every exit path** -- EOF and split paths close
     inside the tracker; ``try_claim`` / ``try_split`` wrap reader-method
     calls and close before re-raising; the DoFn ``finally`` provides
     defense-in-depth for downstream yield exceptions via the SDF wrapper's
     private chain with an ``isinstance`` guard and a warning log if that
     chain is ever refactored upstream.
   * **EOF advances the watermark estimator to ``MAX_TIMESTAMP``** so
     downstream event-time windows can close (would otherwise hang at the
     last reported source watermark).
   * **Initial fan-out** via ``UnboundedSource.split(desired_num_splits=20,
     options)`` -- validates returned sub-sources are ``UnboundedSource``
     instances (raises ``TypeError`` outside the split-refusal ``except``);
     on split-refusal exceptions, falls back to a single restriction and
     logs a WARNING.
   * **``default_output_coder`` wired** via
     ``coders.registry.register_coder`` + ``element_type`` so a custom
     source-declared coder reaches the output PCollection through Beam's
     standard registry lookup. Parameterised coders that cannot be
     class-registered fall back gracefully with a warning -- users with such
     coders must register explicitly before pipeline construction.
   * **``poll_interval_seconds`` validated** to be > 0 in
     ``ReadFromUnboundedSource.__init__``.
   
   ## Test coverage
   
   58 tests, all green locally on Python 3.13 + Beam 2.71:
   
   * ``unbounded_source_test.py`` (42): ABC contracts; restriction coder
     round-trip; restriction tracker state machine (claim / split / EOF /
     no-data / check_done / progress / is_bounded); finalize idempotency;
     source-watermark vs. record-timestamp regression; finalize / resume
     channel separation; tracker-internal exception close on
     ``reader.advance`` and ``reader.get_watermark`` failures; DoFn
     generator close path (unit + integration with downstream raising
     ``Map``); cloudpickle round-trip for transform and source; circular
     import in three orderings via subprocess + tempfile; end-to-end
     DirectRunner pipeline (records in order + windowed GroupByKey).
   * ``iobase_test.py`` (+3): ``Read(UnboundedSource)`` dispatch through the
     new ``expand`` branch; ``Read.to_runner_api`` / ``from_runner_api``
     round-trip with ``IsBounded.UNBOUNDED``; PCollection ``is_bounded``
     assertion.
   
   yapf 0.43.0 + isort 7.0.0 clean (Beam pinned versions, configs in
   ``sdks/python/setup.cfg`` and ``sdks/python/.isort.cfg``).
   
   ## Out of scope (deferred to W2+, tracked under #19137)
   
   Listed exhaustively in the module docstring at
   ``sdks/python/apache_beam/io/unbounded_source.py``:
   
   * Record-id-based deduplication (Java's ``ValueWithRecordId``).
   * Backlog-byte reporting (``restriction_size`` is constant 1;
     ``current_progress`` is binary 0.0 / 1.0).
   * Dynamic split fractions / runner-initiated work stealing.
   * Source-specific checkpoint coders threaded through the SDF restriction
     coder (today the coder pickles checkpoint marks via
     ``_MemoizingPickleCoder`` regardless of the source's
     ``get_checkpoint_mark_coder``).
   * Reader caching across bundles (Java caches readers via a Guava cache;
     this PoC always rebuilds the reader from the checkpoint).
   * ``EmptyUnboundedSource`` terminal-state marker (we use an ``is_done``
     flag on the restriction instead).
   * Runner-side ``IsBounded.UNBOUNDED`` dispatch in
     ``bundle_processor.IMPULSE_READ_TRANSFORM``. Today the wire format
     round-trips correctly but execution flows through the composite's
     expanded sub-transforms.
   
   ------------------------
   
    - [x] Mention the appropriate issue in your description -- ``addresses
      #19137``.
    - [ ] Update ``CHANGES.md`` with noteworthy changes -- intentionally
      deferred until W1/W2 scope is finalised on [email protected]; will
      add before merge.
    - [x] ICLA on file.
   
   cc @yhu for mentor review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to