Hi Kenn,

that should not be the case. Care was taken to fail streaming pipeline which needs this ability and the runner doesn't support this [1]. It is true, however, that a batch pipeline will not fail, because there is no generic (runner agnostic) way of supporting this transform in batch case (which is why the annotation was needed). Failing batch pipelines in this case would mean runners have to understand this annotation, which is pretty much close to implementing this feature as a whole.

This applies generally to any core functionality, it might take some time before runners fully support this. I don't know how to solve it, maybe add record to capability matrix? I can imagine a fully generic solution (runners might publish their capabilities and pipeline might be validated against these capabilities at pipeline build time), but that is obviously out of scope of the annotation.

Jan

[1] https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150

On 2/7/20 1:01 AM, Kenneth Knowles wrote:
There is a major problem with this merge: the runners that do not support it do not reject pipelines that need this feature. They will silently produce the wrong answer, causing data loss.

Kenn

On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz <mailto:je...@seznam.cz>> wrote:

    Hi,

    the PR was merged to master and a few follow-up issues, were created,
    mainly [1] and [2]. I didn't find any reference to SortedMapState in
    JIRA, is there any tracking issue for that that I can link to? I also
    added link to design document here [3].

    [1] https://issues.apache.org/jira/browse/BEAM-9256

    [2] https://issues.apache.org/jira/browse/BEAM-9257

    [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents

    On 1/30/20 1:39 PM, Jan Lukavský wrote:
    > Hi,
    >
    > PR [1] (issue [2]) went though code review, and according to [3]
    seems
    > to me to be ready for merge. Current state of the implementation is
    > that it is supported only for direct runner, legacy flink runner
    > (batch and streaming) and legacy spark (batch). It could be
    supported
    > by all other (streaming) runners using StatefulDoFnRunner, provided
    > the runner can make guarantees about ordering of timer firings
    (which
    > is unfortunately the case only for legacy flink and direct
    runner, at
    > least for now - related issues are mentioned multiple times on
    other
    > threads). Implementation for other batch runners should be as
    > straightforward as adding sorting by event timestamp before
    stateful
    > dofn (in case where the runner doesn't sort already - e.g.
    Dataflow -
    > in which case the annotation can be simply ignored - hence
    support for
    > batch Dataflow seems to be a no-op).
    >
    > There has been some slight controversy about this feature, but
    current
    > feature proposing and implementing guidelines do not cover how to
    > resolve those, so I'm using this opportunity to let the community
    > know, that there is a plan to merge this feature, unless there
    is some
    > veto (please provide specific reasons for that in that case).
    The plan
    > is to merge this in the second part of next week, unless there
    is a veto.
    >
    > Thanks,
    >
    >  Jan
    >
    > [1] https://github.com/apache/beam/pull/8774
    >
    > [2] https://issues.apache.org/jira/browse/BEAM-8550
    >
    > [3] https://beam.apache.org/contribute/committer-guide/
    >

Reply via email to