[
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14610400#comment-14610400
]
ASF GitHub Bot commented on FLINK-1967:
---------------------------------------
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/879
[FLINK-1967] Introduce (Event)time in Streaming
(This PR consists of other commits because of bugs discovered while writing
tests for the new feature and existing features.)
This is the first step towards supporting proper event-time windowing.
## [FLINK-1967] Introduce (Event)time in Streaming
This introduces an additional timestamp field in StreamRecord of type
org.joda.time.Instant. When using a SourceFunction, the timestamp is
automatically set to Instant.now() upon element emission. The timestamp
can be manually set using a ManualTimestampSourceFunction.
This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.
This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.
When using a ManualTimestampSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.
No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.
## [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.
This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.
Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.
This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
- whether open()/close() of RichFunctions are correctly
called
- Watermarks are correctly forwarded
- Supersteps/checkpoint barriers are correctly forwarded and the
blocking of inputs works correctly
## Add proper tests for Stream Operators
These test whether:
- open()/close() on RichFunctions are called
- Timestamps of emitted elements match the timestamp of the input
element
- Watermarks are correctly forwarded
## [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.
The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.
## Performance Testing
To see what the impact of the changes is I ran this program:
```
env
.addSource(new TupleSource(numGroups))
.groupBy(0)
.window(Time.of(5, TimeUnit.SECONDS))
.sum(1)
.flatten()
.writeAsCsv(<blabla>);
```
`TupleSource` is emitting `("group x", 1)` in a loop. I used a Google
Compute Cluster with 3 Workers, DOP=6.
These are the average number of tuples in a group over a few minutes
runtime:
- master: 1,093,726
- event-time: 949,541
- watermark every 500 ms: 887,087
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink event-time
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/879.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #879
----
commit 2673e7af3cded1c7fa24a715f30e317a3640a193
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-22T10:26:44Z
[FLINK-1967] Introduce (Event)time in Streaming
This introduces an additional timestamp field in StreamRecord of type
org.joda.time.Instant. When using a SourceFunction, the timestamp is
automatically set to Instant.now() upon element emission. The timestamp
can be manually set using a ManualTimestampSourceFunction.
This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.
This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.
When using a ManualTimestampSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.
No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.
commit 9fa1a7d12b4c2b2fbf65580f5e5e0de98979ca45
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-29T15:12:59Z
[FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.
This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.
Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.
This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
- whether open()/close() of RichFunctions are correctly
called
- Watermarks are correctly forwarded
- Supersteps/checkpoint barriers are correctly forwarded and the
blocking of inputs works correctly
commit 3da4c4c99ccff7f18c822699c1857b7a56672878
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-30T10:25:40Z
Add proper tests for Stream Operators
These test whether:
- open()/close() on RichFunctions are called
- Timestamps of emitted elements match the timestamp of the input
element
- Watermarks are correctly forwarded
commit 16dcb7ee96a0c6edd4549fe3f42073681dc81493
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-30T14:14:25Z
Ignore KafkaITCase.testPersistentSourceWithOffsetUpdates
commit 7416eff02319b1eb695fa6e2986b93d9b51f2d05
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-30T15:10:05Z
[FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.
The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.
----
> Introduce (Event)time in Streaming
> ----------------------------------
>
> Key: FLINK-1967
> URL: https://issues.apache.org/jira/browse/FLINK-1967
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the
> sources to add timestamps to records. This will also introduce punctuations
> (or low watermarks) to allow windows to work correctly on unordered,
> timestamped input data. In the process of this, the windowing subsystem also
> needs to be adapted to use the punctuations. Furthermore, all operators need
> to be made aware of punctuations and correctly forward them. Then, a new
> operator must be introduced to to allow modification of timestamps.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)