On 7/29/21 6:47 PM, Reuven Lax wrote:


On Thu, Jul 29, 2021 at 9:40 AM Robert Bradshaw <rober...@google.com <mailto:rober...@google.com>> wrote:

    On Sun, Jul 25, 2021 at 11:59 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >
    > I didn't want to say that Flink should not support SDF. I only
    do not see any benefits of it for a native streaming source - like
    Kafka - without the ability to use dynamic splitting.


    Another benefit is that SDFs need not be roots of the pipeline, e.g.
    one can read from a PCollection of Kafka topics.


In fact this was the main original benefit. Many users needed "dynamic" sources - e.g. you have one Kafka topic on which is published the list of Kafka topics that you want to subscribe to. In the past these users hacked around these requirements by restarting their streaming pipelines nightly with the updated list of Kafka topics.
That makes sense. Agree.


    > The potential benefits of composability and extensibility do not
    apply here. Yes, it would be good to have as low number of source
    transforms as possible. And another yes, there probably isn't
    anything that would fundamentally disable Flink to correctly
    support SDF. On the other hand, the current state is such we
    cannot use KafkaIO in Flink. I think we should fix this by the
    shortest possible path, because the technically correct solution
    is currently unknown (at least to me, if anyone can give pointers
    about how to fix the SDF, I'd be grateful).
    >
    > I still think that enabling a runner to support Read natively,
    when appropriate, has value by itself. And it requires SDK Coders
    to be 'known' to the runner, at least that was the result of my tests.

    The right way to do this, which is completely in line with the model,
    is for the runner to recognize the SDF operation in question as
    wrapping a legacy Source, and swap it out for an implementation more
    suitable for its execution.

    Independently, we should make SDF work well on Flink, but we shouldn't
    block Flink on that.

    > On 7/25/21 8:31 PM, Chamikara Jayalath wrote:
    >
    >
    >
    > On Sun, Jul 25, 2021 at 11:09 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>
    >> In general, language-neutral APIs and protocols are a key
    feature of portable Beam.
    >>
    >> Yes, sure, that is well understood. But - language neutral APIs
    requires language neutral environment. That is why the portable
    Pipeline representation is built around protocol buffers and gRPC.
    That is truly language-neutral. Once we implement something around
    that - like in the case of ModelCoders.java - we use a specific
    language for that and the language-neutral part is already gone.
    The decision to include same-language-SDK coders into such
    language-specific object plays no role in the fact it already is
    language-specific.
    >>
    >> Not all runners are implemented  using Java. For example, the
    portable DirectRunner (FnAPI runner) is implemented using Python
    and Dataflow is implemented using C++. Such runners will not be
    able to do this.
    >>
    >> Yes, I'm aware of that and that is why I said "any Java native
    runner". It is true, that non-Java runners *must* (as long as we
    don't include Read into SDK harness) resort to expanding it to
    SDF. That is why use_deprecated_read is invalid setting for such
    runner and should be handled accordingly.
    >>
    >> Similarly, I think there were previous discussions related to
    using SDF as the source framework for portable runners.
    >>
    >> Don't get me wrong, I'm not trying to revoke this decision. On
    the other hand I still think that the decision to use SDF
    implementation of Read or not should be left to the runner.
    >>
    >> I understand that there are some bugs related to SDF and
    portable Flink currently. How much work do you think is needed
    here ? Will it be better to focus our efforts on fixing remaining
    issues for SDF and portable runners instead of supporting
    "use_deprecated_read" for that path ?
    >>
    >> I'm not sure. I don't know portability and the SDK harness well
    enough to be able to answer this. But we should really know why we
    do that. What exactly does SDF bring to the Flink runner (and
    let's leave Flink aside of this - what does it bring to runners
    that cannot make use of dynamic splitting, being it admittedly a
    very cool feature)? Yes, supporting Java Read makes it impossible
    to implement it in Python. But practically, I think that most of
    the Pipelines will use x-lang for that. It makes very much sense
    to offload IOs to a more performant environment.
    >
    >
    > A bit old, but please see the following for the benefits of SDF
    and the motivation for it.
    >
    > https://beam.apache.org/blog/splittable-do-fn/
    <https://beam.apache.org/blog/splittable-do-fn/>
    > https://s.apache.org/splittable-do-fn
    <https://s.apache.org/splittable-do-fn>
    >
    > Thanks,
    > Cham
    >
    >>  Jan
    >>
    >> On 7/25/21 6:54 PM, Chamikara Jayalath wrote:
    >>
    >>
    >>
    >> On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>>
    >>> I'll start from the end.
    >>>
    >>> I don't think we should be breaking language agnostic API
    layers (for example, definition of model coders) just to support
    "use_deprecated_read".
    >>>
    >>> "Breaking" and "fixing" can only be a matter of the definition
    of the object at hand. I don't think, that Coder can be totally
    language agnostic - yes, the mapping between serialized form and
    deserialized form can be _defined_ in a language agnostic way, but
    must be_implemented_ in a specific language. If we choose the
    implementing language, what makes us treat SDK-specific coders
    defined by the SDK of the same language as "unknown"? It is only
    our decision, that seems to have no practical benefits.
    >>
    >>
    >> In general, language-neutral APIs and protocols are a key
    feature of portable Beam. See here:
    https://beam.apache.org/roadmap/portability/
    <https://beam.apache.org/roadmap/portability/>
    >> (I did not look into all the old discussions and votes related
    to this but I'm sure they are there)
    >>
    >>>
    >>> Moreover, including SDK-specific coders into supported coders
    of the SDK runner construction counterpart (that is, runner
    core-construction-java for Java SDK) is a necessary prerequisite
    for unifying "classical" and "portable" runners, because the
    runner needs to understand *all* SDK coders so that it can
    _inline_ the complete Pipeline (if the Pipeline SDK has the same
    language as the runner), instead of running it through SDK
    harness. This need therefore is not specific to supporting
    use_deprecated_read, but is a generic requirement, which only has
    the first manifestation in the support of a transform not
    supported by SDK harness.
    >>>
    >>> I think "use_deprecated_read" should be considered a stop-gap
    measure for Flink (and Spark ?) till we have proper support for
    SDF. In fact I don't think an arbitrary portable runner can
    support "use_deprecated_read" due to the following.
    >>>
    >>> There seems to be nothing special about Flink regarding the
    support of primitive Read. I think any Java native runner can
    implement it pretty much the same way as Flink does. The question
    is if any other runner might want to do that. The problem with
    Flink is that
    >>
    >>
    >> Not all runners are implemented  using Java. For example, the
    portable DirectRunner (FnAPI runner) is implemented using Python
    and Dataflow is implemented using C++. Such runners will not be
    able to do this.
    >>>
    >>>  1) portable SDF seems not to work [1]
    >>>
    >>>  2) even classical Flink runner has still issues with SDF -
    there are reports of watermark being stuck when reading data via
    SDF, this gets resolved using use_deprecated_read
    >>>
    >>>  3) Flink actually does not have any benefits from SDF,
    because it cannot make use of the dynamic splitting, so this
    actually brings only implementation burden without any practical
    benefit
    >>
    >> Similarly, I think there were previous discussions related to
    using SDF as the source framework for portable runners.
    >> I understand that there are some bugs related to SDF and
    portable Flink currently. How much work do you think is needed
    here ? Will it be better to focus our efforts on fixing remaining
    issues for SDF and portable runners instead of supporting
    "use_deprecated_read" for that path ? Note that I'm fine with
    fixing any issues related to "use_deprecated_read" for classic
    (non-portable) Flink but I think you are trying to use x-lang
    hence probably need portable Flink.
    >>
    >> Thanks,
    >> Cham
    >>
    >>>
    >>> I think that we should reiterate on the decision of
    deprecating Read - if we can implement it via SDF, what is the
    reason to forbid a runner to make use of a simpler implementation?
    The expansion of Read might be runner dependent, that is something
    we do all the time, or am I missing something?
    >>>
    >>>  Jan
    >>>
    >>> [1] https://issues.apache.org/jira/browse/BEAM-10940
    <https://issues.apache.org/jira/browse/BEAM-10940>
    >>>
    >>> On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
    >>>
    >>> I think we might be going down a bit of a rabbit hole with the
    support for "use_deprecated_read" for portable Flink :)
    >>>
    >>> I think "use_deprecated_read" should be considered a stop-gap
    measure for Flink (and Spark ?) till we have proper support for
    SDF. In fact I don't think an arbitrary portable runner can
    support "use_deprecated_read" due to the following.
    >>>
    >>> (1) SDK Harness is not aware of BoundedSource/UnboundedSource.
    Only source framework SDK Harness is aware of is SDF.
    >>> (2) Invoking BoundedSource/UnboundedSource is not a part of
    the Fn API
    >>> (3) A non-Java Beam portable runner will probably not be able
    to directly invoke legacy Read transforms similar to the way Flink
    does today.
    >>>
    >>> I don't think we should be breaking language agnostic API
    layers (for example, definition of model coders) just to support
    "use_deprecated_read".
    >>>
    >>> Thanks,
    >>> Cham
    >>>
    >>> On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:
    >>>>
    >>>> On 7/24/21 12:34 AM, Robert Bradshaw wrote:
    >>>>
    >>>> >   On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
    <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
    >>>> >> Hi,
    >>>> >>
    >>>> >> this was a ride. But I managed to get that working. I'd
    like to discuss two points, though:
    >>>> >>
    >>>> >>   a) I had to push Java coders to ModelCoders for Java
    (which makes sense to me, but is that correct?). See [1]. It is
    needed so that the Read transform (executed directly in
    TaskManager) can correctly communicate with Java SDK harness using
    custom coders (which is tested here [2]).
    >>>> > I think the intent was that ModelCoders represent the set of
    >>>> > language-agnostic in the model, though I have to admit I've
    always
    >>>> > been a bit fuzzy on when a coder must or must not be in
    that list.
    >>>> I think that this definition works as long, as runner does
    not itself
    >>>> interfere with the Pipeline. Once the runner starts (by
    itself, not via
    >>>> SdkHarnessClient) producing data, it starts to be part of the
    >>>> environment, and therefore it should understand its own
    Coders. I'd
    >>>> propose the definition of "model coders" to be Coders that
    the SDK is
    >>>> able to understand, which then works naturally for the
    ModelCoders
    >>>> located in "core-construction-java", that it should
    understand Javs SDK
    >>>> Coders.
    >>>> >
    >>>> >>   b) I'd strongly prefer if we moved the handling of
    use_deprecated_read from outside of the Read PTransform directly
    into expand method, see [3]. Though this is not needed for the
    Read on Flink to work, it seems cleaner.
    >>>> >>
    >>>> >> WDYT?
    >>>> > The default value of use_deprecated_read should depend on
    the runner
    >>>> > (e.g. some runners don't work well with it, others require
    it). As
    >>>> > such should not be visible to the PTransform's expand.
    >>>> I think we should know what is the expected outcome. If a
    runner does
    >>>> not support primitive Read (and therefore
    use_deprecated_read), what
    >>>> should we do, if we have such experiment set? Should the
    Pipeline fail,
    >>>> or should it be silently ignored? I think that we should
    fail, because
    >>>> user expects something that cannot be fulfilled. Therefore,
    we have two
    >>>> options - handling the experiment explicitly in runners that
    do not
    >>>> support it, or handle it explicitly in all cases (both
    supported and
    >>>> unsupported). The latter case is when we force runners to
    call explicit
    >>>> conversion method (convertPrimitiveRead....). Every runner
    that does not
    >>>> support primitive Read must handle the experiment either way,
    because
    >>>> otherwise the experiment would be simply silently ignored,
    which is not
    >>>> exactly user-friendly.
    >>>> >
    >>>> >>   Jan
    >>>> >>
    >>>> >> [1]
    
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
    
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
    >>>> >>
    >>>> >> [2]
    
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
    
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
    >>>> >>
    >>>> >> [3]
    
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
    
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
    >>>> >>
    >>>> >> On 7/18/21 6:29 PM, Jan Lukavský wrote:
    >>>> >>
    >>>> >> Hi,
    >>>> >>
    >>>> >> I was debugging the issue and it relates to pipeline
    fusion - it seems that the primitive Read transform gets fused and
    then is 'missing' as source. I'm a little lost in the code, but
    the most strange parts are that:
    >>>> >>
    >>>> >>   a) I tried to reject fusion of primitive Read by adding
    GreedyPCollectionFusers::cannotFuse for
    PTransformTranslation.READ_TRANSFORM_URN to
    GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but that didn't
    change the exception
    >>>> >>
    >>>> >>   b) I tried adding Reshuffle.viaRandomKey between Read
    and PAssert, but that didn't change it either
    >>>> >>
    >>>> >>   c) when I run portable Pipeline with use_deprecated_read
    on Flink it actually runs (though it fails when it actually reads
    any data, but if the input is empty, the job runs), so it does not
    hit the same issue, which is a mystery to me
    >>>> >>
    >>>> >> If anyone has any pointers that I can investigate, I'd be
    really grateful.
    >>>> >>
    >>>> >> Thanks in advance,
    >>>> >>
    >>>> >>   Jan
    >>>> >>
    >>>> >>
    >>>> >>
    >>>> >> On 7/16/21 2:00 PM, Jan Lukavský wrote:
    >>>> >>
    >>>> >> Hi,
    >>>> >>
    >>>> >> I hit another issue with the portable Flink runner. Long
    story short - reading from Kafka is not working in portable Flink.
    After solving issues with expansion service configuration (ability
    to add use_deprecated_read) option, because flink portable runner
    has issues with SDF [1], [2]. After being able to inject the
    use_deprecated_read into expansion service I was able to get an
    execution DAG that has the UnboundedSource, but then more and more
    issues appeared (probably related to missing LengthPrefixCoder
    somewhere - maybe at the output from the primitive Read). I wanted
    to create a test for it and I found out, that there actually is
    ReadSourcePortableTest in FlinkRunner, but _it tests nothing_. The
    problem is that Read is transformed to SDF, so this test tests the
    SDF, not the Read transform. As a result, the Read transform does
    not work.
    >>>> >>
    >>>> >> I tried using
    convertReadBasedSplittableDoFnsToPrimitiveReads so that I could
    make the test fail and debug that, but I got into
    >>>> >>
    >>>> >> java.lang.IllegalArgumentException: PCollectionNodes
    
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
    PCollection=unique_name:
    
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
    >>>> >> coder_id: "IterableCoder"
    >>>> >> is_bounded: BOUNDED
    >>>> >> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
    >>>> >> }] were consumed but never produced
    >>>> >>
    >>>> >>
    >>>> >> which gave me the last knock-out. :)
    >>>> >>
    >>>> >> My current impression is that starting from Beam 2.25.0,
    portable FlinkRunner is not able to read from Kafka. Could someone
    give me a hint about what is wrong with using
    convertReadBasedSplittableDoFnsToPrimitiveReads in the test [3]?
    >>>> >>
    >>>> >>   Jan
    >>>> >>
    >>>> >> [1] https://issues.apache.org/jira/browse/BEAM-11991
    <https://issues.apache.org/jira/browse/BEAM-11991>
    >>>> >>
    >>>> >> [2] https://issues.apache.org/jira/browse/BEAM-11998
    <https://issues.apache.org/jira/browse/BEAM-11998>
    >>>> >>
    >>>> >> [3] https://github.com/apache/beam/pull/15181
    <https://github.com/apache/beam/pull/15181>

Reply via email to