Hi,

there has been some development around this [1], which essentially concludes that currently this feature can be safely supported only by direct runner, flink runner (both batch and streaming, non-portable only) and spark (batch, legacy only). This is due to the fact, that time sorting relies heavily on timers to be strictly ordered. Failing to do so might result in unpredictable data loss, due to window-cleanup of state occurring prior to all elements being emitted (note that this generally might happen even to current user pipelines!). I can link issues [2], [3] and [4] to [5], but the question is, with only so few runners being able to support this, what should be the best way to incorporate this into any upcoming release (I'm assuming that this will pass a vote, which is not known yet)? I'd say that the best way would be the affected runners to fail to execute the pipeline until the respective issues are resolved. Another option would be to block this until the issues are resolved in runners, but that might delay the availability of this feature for some unknown time.

Thanks for any opinions,

Jan

[1] https://lists.apache.org/thread.html/71a8f48ca518f1f2e6e9b1284114624670884775d209b0097f68264b@%3Cdev.beam.apache.org%3E

[2] https://issues.apache.org/jira/browse/BEAM-8459

[3] https://issues.apache.org/jira/browse/BEAM-8460

[4] https://issues.apache.org/jira/browse/BEAM-8543.

[5] https://issues.apache.org/jira/browse/BEAM-8550

On 10/31/19 2:59 PM, Jan Lukavský wrote:
Hi,

as a follow-up from previous design draft, I'd like to promote the document [1] and associated PR [2] to proposal.

The PR contains working implementation for:

 - non-portable batch flink and batch spark (legacy)

 - all non-portable streaming runners that use StatefulDoFnRunner (direct, samza, dataflow)

 - portable flink (batch, streaming)

There are still some unresolved issues:

 a) no way to specify allowed lateness (currently is simply zero, late data should be dropped)

 b) need a way to specify user UDF for extracting timestamp (according to [3] it would be useful to have that option)

 c) need to add more tests (e.g. late data)

The plan is to postpone resolution of issues a) and b) after the proposal is merged. I'd like to gather some more feedback on the proposal, iterate over that again, add more tests and then pass this to a vote.

Unrelated - during implementation a bug [4] in Samza runner was found.

Looking forward to any comments!

Jan

[1] https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/

[2] https://github.com/apache/beam/pull/8774

[3] https://lists.apache.org/thread.html/813429e78b895a336d4f5507e3b2330282e2904fa25d52d6d441741a@%3Cdev.beam.apache.org%3E

[4] https://issues.apache.org/jira/browse/BEAM-8529


On 5/23/19 4:10 PM, Jan Lukavský wrote:
Hi,

I have written a very brief draft of how it might be possible to implement @RequireTimeSortedInput discussed in [1]. I see the document [2] a starting point for a discussion. There are several open questions, which I believe can be resolved by this great community. :-)

Jan

[1] https://lists.apache.org/thread.html/4609a1bb1662690d67950e76d2f1108b51327b8feaf9580de659552e@%3Cdev.beam.apache.org%3E

[2] https://docs.google.com/document/d/1ObLVUFsf1NcG8ZuIZE4aVy2RYKx2FfyMhkZYWPnI9-c/

Reply via email to