TL;DR I am not suggesting that you must implement this for any runner. I'm
afraid I do have to propose this change be rolled back before release
2.21.0 unless we fix this. I think the fix is easily achieved.

Clarifications inline.

On Fri, Feb 7, 2020 at 11:20 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi Kenn,
>
> I think that this approach is not well maintainable and doesn't scale.
> Main reasons:
>
>  a) modifying core has by definition some impact on runners, so modifying
> core would imply necessity to modify all runners
>
My concern is not about all changes to "core" but only changes to the
model, which should be extraordinarily rare. They must receive extreme
scrutiny and require a very high level of consensus. It is true that every
runner needs to either correctly execute or refuse to execute every
pipeline, to the extent possible. For the case we are talking about it is
very easy to meet this requirement.

 b) having to implement core feature for all existing runners will make any
> modification to core prohibitively expensive
>
No one is suggesting this. I am saying that you need to write the 1 line
that I linked to "if (usesRequiresTimeSortedInput) then reject pipeline" so
the runner fails before it begins processing data, potentially consuming
non-replayable messages.


>  c) even if we accept this, there can be runners that are outside of beam
> repo (or even closed source!)
>
Indeed. And those runners need time to adapt to the new proto fields. I did
not mention it this time, because the proto is not considered stable. But
very soon it will be. At that point additions like this will have to be
fully specified and added to the proto long before they are enabled for
use. That way all runners can adjust. The proper order is (1) add model
feature (2) make runners reject it, unsupported (3) add functionality to
SDK (4) add to some runners and enable.


> Therefore I think, that the correct and scalable approach would be to
> split this into several pieces:
>
>  1) define pipeline requirements (this is pretty much similar to how we
> currently scope @Category(ValidatesRunner.class) tests
>
>  2) let pipeline infer it's requirements prior to being translated via
> runner
>
>  3) runner can check the set of required features and their support and
> reject the pipeline if some feature is missing
>
This is exactly what happens today, but was not included in your change.
The pipeline proto (or the Java pipeline object) clearly contain all the
needed information. Whether pipeline summarizes it or the runner implements
a trivial PipelineVisitor is not important.

This could even replace the annotations used in validates runner tests,
> because each runner would simply execute all tests it has enough features
> to run.
>
What you have described is exactly what happens today.


> But as I mentioned - this is pretty much deep change. I don't know how to
> safely do this for current runners, but to actually implement the feature
> (it seems to be to me nearly equally complicated to fail pipeline in batch
> case and to actually implement the sorting).
>
Indeed. This feature hasn't really got consensus. The proposal thread [1]
never really concluded affirmatively [1]. The [VOTE] thread indicates a
clear *lack* of consensus, with all people who weighed in asking to raise
awareness and build more support and consensus. Robert made the good point
that if it is (a) useful and (b) not easy for users to do themselves, then
we should consider it, even if most people here are not interested in the
feature. So that is the closest thing to approval that this feature has.
But getting more people interested and on board would get better feedback
and achieve a better result for our users.

And as a final note, the PR was not reviewed by the core people who built
out state & timers, nor those who built out DoFn annotation systems, nor
any runner author, nor those working on the Beam model protos. You really
should have gotten most of these people involved. They would likely have
caught the issues described here.

The specific action that I am proposing is to implement the 1 liner
described in all runners. It might be best to roll back and proceed with
steps 1-4 I outlined above, so we can be sure things are proceeding well.

Kenn

[1]
https://lists.apache.org/thread.html/b91f96121d37bf16403acbd88bc264cf16e40ddb636f0435276e89aa%40%3Cdev.beam.apache.org%3E
[2]
https://lists.apache.org/thread.html/91b87940ba7736f9f1021928271a0090f8a0096e5e3f9e52de89acf2%40%3Cdev.beam.apache.org%3E

> It would be super cool if anyone would be interested in implementing this
> in runners that don't currently support it. A side note - currently the
> annotation is not supported by all streaming runners due to missing
> guarantees for timers ordering (which can lead to data losss). I think I
> have found a solution to this, see [1], but I'd like to be 100% sure,
> before enabling the support (I'm not sure what is the impact of mis-ordered
> timers on output timestamps, and so on, and so forth).
>
> Jan
>
> [1]
> https://github.com/apache/beam/pull/10795/files#diff-11a02ba72f437b89e35f7ad37102dfd1R209
> On 2/7/20 7:53 PM, Kenneth Knowles wrote:
>
> I see. It is good to see that the pipeline will at least fail. However,
> the expect approach here is that the pipeline is rejected prior to
> execution. That is a primary reason for our annotation-driven API style; it
> allows much better "static" analysis by a runner, so we don't have to wait
> and fail late. Here is an example:
> https://github.com/apache/beam/blob/master/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java#L1940
>
> Kenn
>
> On Thu, Feb 6, 2020 at 11:03 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Kenn,
>>
>> that should not be the case. Care was taken to fail streaming pipeline
>> which needs this ability and the runner doesn't support this [1]. It is
>> true, however, that a batch pipeline will not fail, because there is no
>> generic (runner agnostic) way of supporting this transform in batch case
>> (which is why the annotation was needed). Failing batch pipelines in this
>> case would mean runners have to understand this annotation, which is pretty
>> much close to implementing this feature as a whole.
>>
>> This applies generally to any core functionality, it might take some time
>> before runners fully support this. I don't know how to solve it, maybe add
>> record to capability matrix? I can imagine a fully generic solution
>> (runners might publish their capabilities and pipeline might be validated
>> against these capabilities at pipeline build time), but that is obviously
>> out of scope of the annotation.
>>
>> Jan
>>
>> [1]
>> https://github.com/apache/beam/blob/master/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java#L150
>> On 2/7/20 1:01 AM, Kenneth Knowles wrote:
>>
>> There is a major problem with this merge: the runners that do not support
>> it do not reject pipelines that need this feature. They will silently
>> produce the wrong answer, causing data loss.
>>
>> Kenn
>>
>> On Thu, Feb 6, 2020 at 3:24 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> the PR was merged to master and a few follow-up issues, were created,
>>> mainly [1] and [2]. I didn't find any reference to SortedMapState in
>>> JIRA, is there any tracking issue for that that I can link to? I also
>>> added link to design document here [3].
>>>
>>> [1] https://issues.apache.org/jira/browse/BEAM-9256
>>>
>>> [2] https://issues.apache.org/jira/browse/BEAM-9257
>>>
>>> [3] https://cwiki.apache.org/confluence/display/BEAM/Design+Documents
>>>
>>> On 1/30/20 1:39 PM, Jan Lukavský wrote:
>>> > Hi,
>>> >
>>> > PR [1] (issue [2]) went though code review, and according to [3] seems
>>> > to me to be ready for merge. Current state of the implementation is
>>> > that it is supported only for direct runner, legacy flink runner
>>> > (batch and streaming) and legacy spark (batch). It could be supported
>>> > by all other (streaming) runners using StatefulDoFnRunner, provided
>>> > the runner can make guarantees about ordering of timer firings (which
>>> > is unfortunately the case only for legacy flink and direct runner, at
>>> > least for now - related issues are mentioned multiple times on other
>>> > threads). Implementation for other batch runners should be as
>>> > straightforward as adding sorting by event timestamp before stateful
>>> > dofn (in case where the runner doesn't sort already - e.g. Dataflow -
>>> > in which case the annotation can be simply ignored - hence support for
>>> > batch Dataflow seems to be a no-op).
>>> >
>>> > There has been some slight controversy about this feature, but current
>>> > feature proposing and implementing guidelines do not cover how to
>>> > resolve those, so I'm using this opportunity to let the community
>>> > know, that there is a plan to merge this feature, unless there is some
>>> > veto (please provide specific reasons for that in that case). The plan
>>> > is to merge this in the second part of next week, unless there is a
>>> veto.
>>> >
>>> > Thanks,
>>> >
>>> >  Jan
>>> >
>>> > [1] https://github.com/apache/beam/pull/8774
>>> >
>>> > [2] https://issues.apache.org/jira/browse/BEAM-8550
>>> >
>>> > [3] https://beam.apache.org/contribute/committer-guide/
>>> >
>>>
>>

Reply via email to