[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15533061#comment-15533061 ]
ASF GitHub Bot commented on FLINK-3674: --------------------------------------- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/2570 [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from `WindowOperator` behind a well defined interface that can be used by operators (and user functions). `InternalTimerService` is the new interface that has the same functionality that `WindowOperator` used to have. `TimerService` is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in `HeapInternalTimerService` that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in `AbstractStreamOperator` and operators can ask for an `InternalTimerService`. This also adds tests for `HeapInternalTimerService`. This adds two new user functions: - `TimelyFlatMapFunction`: an extension of `FlatMapFunction` that also allows querying time and setting timers - `TimelyCoFlatMapFunction`: the same, but for `CoFlatMapFunction` There are two new `StreamOperator` implementations for these that use the `InternalTimerService` interface. This also adds tests for the two new operators. This also adds the new interface `KeyContext` that is used for setting/querying the current key context for state and timers. Timers are always scoped to a key, for now. Also, this moves the handling of watermarks for both one-input and two-input operators to `AbstractStreamOperators` so that we have a central ground-truth. There was also a bunch of small changes that I had to do to make the proper change more clean. I would like to keep these as separate commits because they clearly document what was going on. ## Note for Reviewers You should probably start from the tests, i.e. `HeapInternalTimerServiceTest`, `TimelyFlatMapTest` and `TimelyCoFlatMapTest`. Then, the other interesting bits are `AbstractStreamOperator` that now deals with watermarks and checkpointing the timers and the `HeapInternalTimerService` as well. Keep in mind that this is just moving the code from `WindowOperator` to `HeapInternalTimerService` with some generalizations. I didn't try to optimize any of the data structures that are used. R: @StephanEwen @StefanRRichter @kl0u for review, please 😃 You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink timely-function Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2570.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2570 ---- commit 1a09d9032bf5683a378a7fc8dc480f2d14c5924d Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-25T18:58:16Z Rename TimeServiceProvider to ProcessingTimeService The name is clashing with the soon-to-be-added TimerService/InternalTimerService which is meant as an interface for dealing with both processing time and event time. TimeServiceProvided is renamed to ProcessingTimeService to reflect the fact that it is a low-level utility that only deals with "physical" processing-time trigger tasks. commit 758827c3c8508ef9ef2ec079ff3a8469d0096ca8 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-28T13:10:35Z Use OperatorTestHarness and TestProcessingTimeService in Kafka Tests commit f6dd9c74dc2c58c4263fb6d084651b514898d47a Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-28T14:35:33Z Use Processing-Time Service of TestHarness in WindowOperatorTest Before, this was manually creating a TestProcessingTimeService, now, we're using the one that is there by default in OneInputStreamOperatorTestHarness. commit 65389d66c5586e6707b7a6bf48df512354fac085 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-28T14:43:40Z Refactor OperatorTestHarness to always use TestProcessingTimeService Before, this would allow handing in a custom ProcessingTimeService but this was in reality always TestProcessingTimeService. commit 1d013bcacc040552e5783c64d094ec309014457b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-28T13:12:26Z Use TestHarness Processing-time Facility in BucketingSinkTest Before, this was manually creating a TestProcessingTimeService. Now we use the one that is there by default in OneInputStreamOperatorTestHarness. commit eaf3dd00fefeb2487c7cafff6337123cbe42874b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-28T13:32:24Z Use OperatorTestHarness in AlignedWindowOperator Tests commit b597d2ef50c27554b83fddaff8873107265340d4 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-29T14:04:29Z Refactor Operator TestHarnesses to use Common Base Class This also introduces KeyedTwoInputStreamOperatorTestHarness which is similar to KeyedOneInputStreamOperatorTestHarness commit 9b5b07ce97b31661ac5917c51e449ab0a85dbb58 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2016-09-26T14:21:51Z [FLINK-3674] Add an interface for Time aware User Functions This moves the event-time/processing-time trigger code from WindowOperator behind a well defined interface that can be used by operators (and user functions). InternalTimerService is the new interface that has the same functionality that WindowOperator used to have. TimerService is the user facing interface that does not allow dealing with namespaces/payloads and also does not allow deleting timers. There is a default implementation in HeapInternalTimerService that can checkpoint timers to a stream and also restore from a stream. Right now, this is managed in AbstractStreamOperator and operators can ask for an InternalTimerService. This also adds tests for HeapInternalTimerService. This adds two new user functions: - TimelyFlatMapFunction: an extension of FlatMapFunction that also allows querying time and setting timers - TimelyCoFlatMapFunction: the same, but for CoFlatMapFunction There are two new StreamOperator implementations for these that use the InternalTimerService interface. This also adds tests for the two new operators. This also adds the new interface KeyContext that is used for setting/querying the current key context for state and timers. Timers are always scoped to a key, for now. Also, this moves the handling of watermarks for both one-input and two-input operators to AbstractStreamOperators so that we have a central ground-truth. ---- > Add an interface for Time aware User Functions > ---------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)