[
https://issues.apache.org/jira/browse/FLINK-1967?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14624937#comment-14624937
]
ASF GitHub Bot commented on FLINK-1967:
---------------------------------------
GitHub user aljoscha opened a pull request:
https://github.com/apache/flink/pull/906
[FLINK-1967] Introduce (Event)time in Streaming
## [FLINK-1967] Introduce (Event)time in Streaming
This introduces an additional timestamp field in StreamRecord. When using a
SourceFunction and an auto-timestamp interval is set using the
ExecutionConfig, the timestamp is automatically set to
System.currentTimeMillis()
upon element emission. The timestamp can be manually set using an
EventTimeSourceFunction.
This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.
This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.
When using an EventTimeSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.
No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.
This commit contains fixes for other stuff that was discovered while
implementing the feature. See Jira issue numbers and descriptions below.
## [FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.
This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.
Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.
This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
- whether open()/close() of RichFunctions are correctly
called
- Watermarks are correctly forwarded
- Supersteps/checkpoint barriers are correctly forwarded and the
blocking of inputs works correctly
## Add proper tests for Stream Operators
These test whether:
- open()/close() on RichFunctions are called
- Timestamps of emitted elements match the timestamp of the input
element
- Watermarks are correctly forwarded
## [FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.
The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/aljoscha/flink event-time-in-band
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/906.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #906
----
commit 89add486b51ce83cd23312ba26b79f2f66e3c12a
Author: Aljoscha Krettek <[email protected]>
Date: 2015-06-22T10:26:44Z
[FLINK-1967] Introduce (Event)time in Streaming
This introduces an additional timestamp field in StreamRecord. When using a
SourceFunction and an auto-timestamp interval is set using the
ExecutionConfig, the timestamp is automatically set to
System.currentTimeMillis()
upon element emission. The timestamp can be manually set using an
EventTimeSourceFunction.
This also changes the signature of the StreamOperators. They now get
a StreamRecord instead of the unwrapped value. This is necessary for
them to access the timestamp. Before, the StreamTask would unwrap the
value from the StreamRecord, now this is moved one layer higher.
This also introduces watermarks to keep track of processing. At
a configurable interval the sources will emit watermarks that signify
that no records with a lower timestamp will be emitted in the future by
this source. The timestamps are broadcast, stream inputs wait for watermark
events on all input channels and forward the watermark to the
StreamOperator once the watermark advances on all inputs. Operators are
responsible for forwarding the watermark once they know that no elements
with a previous timestamp will be emitted (this is mostly relevant for
buffering operations such as windows). Right now, all operators simply
forward the watermark they receive.
When using an EventTimeSourceFunction the system does not
automatically emit timestamps, the user is required to manually emit
watermarks using the SourceContext.
No watermarks will be emitted unless
ExecutionConfig.setAutoWatermarkInterval is used to set a watermark
interval.
This commit contains fixes for other stuff that was discovered while
implementing the feature. See Jira issue numbers and descriptions below.
[FLINK-2290/2295] Fix CoReader Event Handling and Checkpointing
This changes CoReader (now CoStreamingRecordReader) to reuse
UnionGate for the input multiplexing. This way it will not lock in on
one specific input side and read events from both input sides.
This also enables an event listener for checkpoint barriers so that the
TwoInputTask now reacts to those and correctly forwards them.
Then, this adds CoStreamCheckpointintITCase to verify that checkpointing
works in topologies with TwoInputStreamTasks.
This also adds tests for OneInputStreamTask and TwoInputStreamTask
that check whether:
- whether open()/close() of RichFunctions are correctly
called
- Watermarks are correctly forwarded
- Supersteps/checkpoint barriers are correctly forwarded and the
blocking of inputs works correctly
Add proper tests for Stream Operators
These test whether:
- open()/close() on RichFunctions are called
- Timestamps of emitted elements match the timestamp of the input
element
- Watermarks are correctly forwarded
[FLINK-2301] Fix Behaviour of BarrierBuffer and add Tests
Before, a CheckpointBarrier from a more recent Checkpoint would also
trigger unblocking while waiting on an older CheckpointBarrier. Now,
a CheckpointBarrier from a newer checkpoint will unblock all channels
and start a new wait on the new Checkpoint.
The tests for OneInputStreamTask and TwoInputStreamTask check whether
the buffer behaves correctly when receiving CheckpointBarriers from more
recent checkpoints while still waiting on an older CheckpointBarrier.
commit d7d7ae1b1e6aa4b69eddcb1bd78af06a8cb0508d
Author: Aljoscha Krettek <[email protected]>
Date: 2015-07-13T16:09:54Z
WIP on remove-lock-contention
----
> Introduce (Event)time in Streaming
> ----------------------------------
>
> Key: FLINK-1967
> URL: https://issues.apache.org/jira/browse/FLINK-1967
> Project: Flink
> Issue Type: Improvement
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
>
> This requires introducing a timestamp in streaming record and a change in the
> sources to add timestamps to records. This will also introduce punctuations
> (or low watermarks) to allow windows to work correctly on unordered,
> timestamped input data. In the process of this, the windowing subsystem also
> needs to be adapted to use the punctuations. Furthermore, all operators need
> to be made aware of punctuations and correctly forward them. Then, a new
> operator must be introduced to to allow modification of timestamps.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)