I see. It is good to see that the pipeline will at least fail. However, the
expect approach here is that the pipeline is rejected prior to execution.
That is a primary reason for our annotation-driven API style; it allows
much better "static" analysis by a runner, so we don't have to wait and
fail late. Here is an example:
https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940

Kenn

On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> that should not be the case. Care was taken to fail streaming pipeline
> which needs this ability and the runner doesn't support this [1]. It is
> true, however, that a batch pipeline will not fail, because there is no
> generic (runner agnostic) way of supporting this transform in batch case
> (which is why the annotation was needed). Failing batch pipelines in this
> case would mean runners have to understand this annotation, which is pretty
> much close to implementing this feature as a whole.
>
> This applies generally to any core functionality, it might take some time
> before runners fully support this. I don't know how to solve it, maybe add
> record to capability matrix? I can imagine a fully generic solution
> (runners might publish their capabilities and pipeline might be validated
> against these capabilities at pipeline build time), but that is obviously
> out of scope of the annotation.
>
> Jan
>
> [1]
> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>
> There is a major problem with this merge: the runners that do not support
> it do not reject pipelines that need this feature. They will silently
> produce the wrong answer, causing data loss.
>
> Kenn
>
> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi,
>>
>> the PR was merged to master and a few follow-up issues, were created,
>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>> JIRA, is there any tracking issue for that that I can link to? I also
>> added link to design document here [3].
>>
>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>
>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>
>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>
>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>> > Hi,
>> >
>> > PR [1] (issue [2]) went though code review, and according to [3] seems
>> > to me to be ready for merge. Current state of the implementation is
>> > that it is supported only for direct runner, legacy flink runner
>> > (batch and streaming) and legacy spark (batch). It could be supported
>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>> > the runner can make guarantees about ordering of timer firings (which
>> > is unfortunately the case only for legacy flink and direct runner, at
>> > least for now - related issues are mentioned multiple times on other
>> > threads). Implementation for other batch runners should be as
>> > straightforward as adding sorting by event timestamp before stateful
>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>> > in which case the annotation can be simply ignored - hence support for
>> > batch Dataflow seems to be a no-op).
>> >
>> > There has been some slight controversy about this feature, but current
>> > feature proposing and implementing guidelines do not cover how to
>> > resolve those, so I'm using this opportunity to let the community
>> > know, that there is a plan to merge this feature, unless there is some
>> > veto (please provide specific reasons for that in that case). The plan
>> > is to merge this in the second part of next week, unless there is a
>> veto.
>> >
>> > Thanks,
>> >
>> >  Jan
>> >
>> > [1] https://github.com/apache/beam/pull/8774
>> >
>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>> >
>> > [3] https://beam.apache.org/contribute/committer-guide/
>> >
>>
>

Reply via email to