I didn't want to say that Flink should not support SDF. I only do not see any benefits of it for a native streaming source - like Kafka - without the ability to use dynamic splitting. The potential benefits of composability and extensibility do not apply here. Yes, it would be good to have as low number of source transforms as possible. And another yes, there probably isn't anything that would fundamentally disable Flink to correctly support SDF. On the other hand, the current state is such we cannot use KafkaIO in Flink. I think we should fix this by the shortest possible path, because the technically correct solution is currently unknown (at least to me, if anyone can give pointers about how to fix the SDF, I'd be grateful).

I still think that enabling a runner to support Read natively, when appropriate, has value by itself. And it requires SDK Coders to be 'known' to the runner, at least that was the result of my tests.

On 7/25/21 8:31 PM, Chamikara Jayalath wrote:


On Sun, Jul 25, 2021 at 11:09 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    In general, language-neutral APIs and protocols are a key feature
    of portable Beam.
    Yes, sure, that is well understood. But - language neutral APIs
    requires language neutral environment. That is why the portable
    Pipeline representation is built around protocol buffers and gRPC.
    That is truly language-neutral. Once we implement something around
    that - like in the case of ModelCoders.java - we use a specific
    language for that and the language-neutral part is already gone.
    The decision to include same-language-SDK coders into such
    language-specific object plays no role in the fact it already is
    language-specific.

    Not all runners are implemented  using Java. For example, the
    portable DirectRunner (FnAPI runner) is implemented using Python
    and Dataflow is implemented using C++. Such runners will not be
    able to do this.
    Yes, I'm aware of that and that is why I said "any Java native
    runner". It is true, that non-Java runners *must* (as long as we
    don't include Read into SDK harness) resort to expanding it to
    SDF. That is why use_deprecated_read is invalid setting for such
    runner and should be handled accordingly.

    Similarly, I think there were previous discussions related to
    using SDF as the source framework for portable runners.
    Don't get me wrong, I'm not trying to revoke this decision. On the
    other hand I still think that the decision to use SDF
    implementation of Read or not should be left to the runner.

    I understand that there are some bugs related to SDF and portable
    Flink currently. How much work do you think is needed here ? Will
    it be better to focus our efforts on fixing remaining issues for
    SDF and portable runners instead of supporting
"use_deprecated_read" for that path ?
    I'm not sure. I don't know portability and the SDK harness well
    enough to be able to answer this. But we should really know why we
    do that. What exactly does SDF bring to the Flink runner (and
    let's leave Flink aside of this - what does it bring to runners
    that cannot make use of dynamic splitting, being it admittedly a
    very cool feature)? Yes, supporting Java Read makes it impossible
    to implement it in Python. But practically, I think that most of
    the Pipelines will use x-lang for that. It makes very much sense
    to offload IOs to a more performant environment.


A bit old, but please see the following for the benefits of SDF and the motivation for it.

https://beam.apache.org/blog/splittable-do-fn/ <https://beam.apache.org/blog/splittable-do-fn/> https://s.apache.org/splittable-do-fn <https://s.apache.org/splittable-do-fn>

Thanks,
Cham

     Jan

    On 7/25/21 6:54 PM, Chamikara Jayalath wrote:


    On Sun, Jul 25, 2021 at 6:33 AM Jan Lukavský <je...@seznam.cz
    <mailto:je...@seznam.cz>> wrote:

        I'll start from the end.

        I don't think we should be breaking language agnostic API
        layers (for example, definition of model coders) just to
        support "use_deprecated_read".
        "Breaking" and "fixing" can only be a matter of the
        definition of the object at hand. I don't think, that Coder
        can be totally language agnostic - yes, the mapping between
        serialized form and deserialized form can be _defined_ in a
        language agnostic way, but must be_implemented_ in a specific
        language. If we choose the implementing language, what makes
        us treat SDK-specific coders defined by the SDK of the same
        language as "unknown"? It is only our decision, that seems to
        have no practical benefits.


    In general, language-neutral APIs and protocols are a key feature
    of portable Beam. See here:
    https://beam.apache.org/roadmap/portability/
    <https://beam.apache.org/roadmap/portability/>
    (I did not look into all the old discussions and votes related to
    this but I'm sure they are there)

        Moreover, including SDK-specific coders into supported coders
        of the SDK runner construction counterpart (that is, runner
        core-construction-java for Java SDK) is a necessary
        prerequisite for unifying "classical" and "portable" runners,
        because the runner needs to understand *all* SDK coders so
        that it can _inline_ the complete Pipeline (if the Pipeline
        SDK has the same language as the runner), instead of running
        it through SDK harness. This need therefore is not specific
        to supporting use_deprecated_read, but is a generic
        requirement, which only has the first manifestation in the
        support of a transform not supported by SDK harness.

        I think "use_deprecated_read" should be considered a
        stop-gap measure for Flink (and Spark ?) till we have proper
        support for SDF. In fact I don't think an arbitrary portable
        runner can support "use_deprecated_read" due to the following.
        There seems to be nothing special about Flink regarding the
        support of primitive Read. I think any Java native runner can
        implement it pretty much the same way as Flink does. The
        question is if any other runner might want to do that. The
        problem with Flink is that


    Not all runners are implemented  using Java. For example, the
    portable DirectRunner (FnAPI runner) is implemented using Python
    and Dataflow is implemented using C++. Such runners will not be
    able to do this.

         1) portable SDF seems not to work [1]

         2) even classical Flink runner has still issues with SDF -
        there are reports of watermark being stuck when reading data
        via SDF, this gets resolved using use_deprecated_read

         3) Flink actually does not have any benefits from SDF,
        because it cannot make use of the dynamic splitting, so this
        actually brings only implementation burden without any
        practical benefit

    Similarly, I think there were previous discussions related to
    using SDF as the source framework for portable runners.
    I understand that there are some bugs related to SDF and portable
    Flink currently. How much work do you think is needed here ? Will
    it be better to focus our efforts on fixing remaining issues for
    SDF and portable runners instead of supporting
    "use_deprecated_read" for that path ? Note that I'm fine with
    fixing any issues related to "use_deprecated_read" for classic
    (non-portable) Flink but I think you are trying to use x-lang
    hence probably need portable Flink.

    Thanks,
    Cham

        I think that we should reiterate on the decision of
        deprecating Read - if we can implement it via SDF, what is
        the reason to forbid a runner to make use of a simpler
        implementation? The expansion of Read might be runner
        dependent, that is something we do all the time, or am I
        missing something?

         Jan

        [1] https://issues.apache.org/jira/browse/BEAM-10940
        <https://issues.apache.org/jira/browse/BEAM-10940>

        On 7/25/21 1:38 AM, Chamikara Jayalath wrote:
        I think we might be going down a bit of a rabbit hole with
        the support for "use_deprecated_read" for portable Flink :)

        I think "use_deprecated_read" should be considered a
        stop-gap measure for Flink (and Spark ?) till we have proper
        support for SDF. In fact I don't think an arbitrary portable
        runner can support "use_deprecated_read" due to the following.

        (1) SDK Harness is not aware of
        BoundedSource/UnboundedSource. Only source framework SDK
        Harness is aware of is SDF.
        (2) Invoking BoundedSource/UnboundedSource is not a part of
        the Fn API
        (3) A non-Java Beam portable runner will probably not be
        able to directly invoke legacy Read transforms similar to
        the way Flink does today.

        I don't think we should be breaking language agnostic API
        layers (for example, definition of model coders) just to
        support "use_deprecated_read".

        Thanks,
        Cham

        On Sat, Jul 24, 2021 at 11:50 AM Jan Lukavský
        <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

            On 7/24/21 12:34 AM, Robert Bradshaw wrote:

            >   On Thu, Jul 22, 2021 at 10:20 AM Jan Lukavský
            <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:
            >> Hi,
            >>
            >> this was a ride. But I managed to get that working.
            I'd like to discuss two points, though:
            >>
            >>   a) I had to push Java coders to ModelCoders for
            Java (which makes sense to me, but is that correct?).
            See [1]. It is needed so that the Read transform
            (executed directly in TaskManager) can correctly
            communicate with Java SDK harness using custom coders
            (which is tested here [2]).
            > I think the intent was that ModelCoders represent the
            set of
            > language-agnostic in the model, though I have to admit
            I've always
            > been a bit fuzzy on when a coder must or must not be
            in that list.
            I think that this definition works as long, as runner
            does not itself
            interfere with the Pipeline. Once the runner starts (by
            itself, not via
            SdkHarnessClient) producing data, it starts to be part
            of the
            environment, and therefore it should understand its own
            Coders. I'd
            propose the definition of "model coders" to be Coders
            that the SDK is
            able to understand, which then works naturally for the
            ModelCoders
            located in "core-construction-java", that it should
            understand Javs SDK
            Coders.
            >
            >>   b) I'd strongly prefer if we moved the handling of
            use_deprecated_read from outside of the Read PTransform
            directly into expand method, see [3]. Though this is not
            needed for the Read on Flink to work, it seems cleaner.
            >>
            >> WDYT?
            > The default value of use_deprecated_read should depend
            on the runner
            > (e.g. some runners don't work well with it, others
            require it). As
            > such should not be visible to the PTransform's expand.
            I think we should know what is the expected outcome. If
            a runner does
            not support primitive Read (and therefore
            use_deprecated_read), what
            should we do, if we have such experiment set? Should the
            Pipeline fail,
            or should it be silently ignored? I think that we should
            fail, because
            user expects something that cannot be fulfilled.
            Therefore, we have two
            options - handling the experiment explicitly in runners
            that do not
            support it, or handle it explicitly in all cases (both
            supported and
            unsupported). The latter case is when we force runners
            to call explicit
            conversion method (convertPrimitiveRead....). Every
            runner that does not
            support primitive Read must handle the experiment either
            way, because
            otherwise the experiment would be simply silently
            ignored, which is not
            exactly user-friendly.
            >
            >>   Jan
            >>
            >> [1]
            
https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375
            
<https://github.com/apache/beam/pull/15181/commits/394ddc3fdbaacc805d8f7ce02ad2698953f34375>
            >>
            >> [2]
            
https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201
            
<https://github.com/apache/beam/pull/15181/files#diff-b1ec58edff6c096481ff336f6fc96e7ba5bcb740dff56c72606ff4f8f0bf85f3R201>
            >>
            >> [3]
            
https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb
            
<https://github.com/apache/beam/pull/15181/commits/f1d3fd0217e5513995a72e92f68fe3d1d665c5bb>
            >>
            >> On 7/18/21 6:29 PM, Jan Lukavský wrote:
            >>
            >> Hi,
            >>
            >> I was debugging the issue and it relates to pipeline
            fusion - it seems that the primitive Read transform gets
            fused and then is 'missing' as source. I'm a little lost
            in the code, but the most strange parts are that:
            >>
            >>   a) I tried to reject fusion of primitive Read by
            adding GreedyPCollectionFusers::cannotFuse for
            PTransformTranslation.READ_TRANSFORM_URN to
            GreedyPCollectionFusers.URN_FUSIBILITY_CHECKERS, but
            that didn't change the exception
            >>
            >>   b) I tried adding Reshuffle.viaRandomKey between
            Read and PAssert, but that didn't change it either
            >>
            >>   c) when I run portable Pipeline with
            use_deprecated_read on Flink it actually runs (though it
            fails when it actually reads any data, but if the input
            is empty, the job runs), so it does not hit the same
            issue, which is a mystery to me
            >>
            >> If anyone has any pointers that I can investigate,
            I'd be really grateful.
            >>
            >> Thanks in advance,
            >>
            >>   Jan
            >>
            >>
            >>
            >> On 7/16/21 2:00 PM, Jan Lukavský wrote:
            >>
            >> Hi,
            >>
            >> I hit another issue with the portable Flink runner.
            Long story short - reading from Kafka is not working in
            portable Flink. After solving issues with expansion
            service configuration (ability to add
            use_deprecated_read) option, because flink portable
            runner has issues with SDF [1], [2]. After being able to
            inject the use_deprecated_read into expansion service I
            was able to get an execution DAG that has the
            UnboundedSource, but then more and more issues appeared
            (probably related to missing LengthPrefixCoder somewhere
            - maybe at the output from the primitive Read). I wanted
            to create a test for it and I found out, that there
            actually is ReadSourcePortableTest in FlinkRunner, but
            _it tests nothing_. The problem is that Read is
            transformed to SDF, so this test tests the SDF, not the
            Read transform. As a result, the Read transform does not
            work.
            >>
            >> I tried using
            convertReadBasedSplittableDoFnsToPrimitiveReads so that
            I could make the test fail and debug that, but I got into
            >>
            >> java.lang.IllegalArgumentException: PCollectionNodes
            
[PCollectionNode{id=PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output,
            PCollection=unique_name:
            
"PAssert$0/GroupGlobally/ParDo(ToSingletonIterables)/ParMultiDo(ToSingletonIterables).output"
            >> coder_id: "IterableCoder"
            >> is_bounded: BOUNDED
            >> windowing_strategy_id: "WindowingStrategy(GlobalWindows)"
            >> }] were consumed but never produced
            >>
            >>
            >> which gave me the last knock-out. :)
            >>
            >> My current impression is that starting from Beam
            2.25.0, portable FlinkRunner is not able to read from
            Kafka. Could someone give me a hint about what is wrong
            with using
            convertReadBasedSplittableDoFnsToPrimitiveReads in the
            test [3]?
            >>
            >>   Jan
            >>
            >> [1] https://issues.apache.org/jira/browse/BEAM-11991
            <https://issues.apache.org/jira/browse/BEAM-11991>
            >>
            >> [2] https://issues.apache.org/jira/browse/BEAM-11998
            <https://issues.apache.org/jira/browse/BEAM-11998>
            >>
            >> [3] https://github.com/apache/beam/pull/15181
            <https://github.com/apache/beam/pull/15181>

Reply via email to