alpinegizmo commented on a change in pull request #14292:
URL: https://github.com/apache/flink/pull/14292#discussion_r535987359
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
Review comment:
```suggestion
- *Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
Review comment:
```suggestion
As long as a time attribute is not modified, and is simply forwarded from
one part of a query to another, it remains a valid time attribute.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
Review comment:
```suggestion
Time attributes behave like regular timestamps, and are accessible for
calculations.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard
timestamps.
+However, the opposite is not true; ordinary timestamp columns cannot be
arbitrarily converted to time attributes in the middle of a query.
Review comment:
```suggestion
However, ordinary timestamps cannot be used in place of, or be converted to,
time attributes.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -318,98 +145,61 @@ val windowedTable = table.window(Tumble over 10.minutes
on $"user_action_time" a
</div>
</div>
-### Using a TableSource
-The event time attribute is defined by a `TableSource` that implements the
`DefinedRowtimeAttributes` interface. The `getRowtimeAttributeDescriptors()`
method returns a list of `RowtimeAttributeDescriptor` for describing the final
name of a time attribute, a timestamp extractor to derive the values of the
attribute, and the watermark strategy associated with the attribute.
+Processing Time
+---------------
+
+Processing time allows a table program to produce results based on the time of
the local machine. It is the simplest notion of time but does not provide
determinism. It neither requires timestamp extraction nor watermark generation.
Review comment:
```suggestion
Processing time allows a table program to produce results based on the time
of the local machine. It is the simplest notion of time, but it will generate
non-deterministic results. Processing time does not require timestamp
extraction or watermark generation.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard
timestamps.
+However, the opposite is not true; ordinary timestamp columns cannot be
arbitrarily converted to time attributes in the middle of a query.
-Time attributes can be part of every table schema. They are defined when
creating a table from a CREATE TABLE DDL or a `DataStream` or are pre-defined
when using a `TableSource`. Once a time attribute has been defined at the
beginning, it can be referenced as a field and can be used in time-based
operations.
-
-As long as a time attribute is not modified and is simply forwarded from one
part of the query to another, it remains a valid time attribute. Time
attributes behave like regular timestamps and can be accessed for calculations.
If a time attribute is used in a calculation, it will be materialized and
becomes a regular timestamp. Regular timestamps do not cooperate with Flink's
time and watermarking system and thus can not be used for time-based operations
anymore.
-
-Table programs require that the corresponding time characteristic has been
specified for the streaming environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-
-env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) #
default
-
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-Processing time
----------------
-
-Processing time allows a table program to produce results based on the time of
the local machine. It is the simplest notion of time but does not provide
determinism. It neither requires timestamp extraction nor watermark generation.
-
-There are three ways to define a processing time attribute.
-
-### Defining in create table DDL
-
-The processing time attribute is defined as a computed column in create table
DDL using the system `PROCTIME()` function. Please see [CREATE TABLE DDL]({%
link dev/table/sql/create.md %}#create-table) for more information about
computed column.
-
-{% highlight sql %}
-
-CREATE TABLE user_actions (
- user_name STRING,
- data STRING,
- user_action_time AS PROCTIME() -- declare an additional field as a
processing time attribute
-) WITH (
- ...
-);
-
-SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT
user_name)
-FROM user_actions
-GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
-
-{% endhighlight %}
-
-
-### During DataStream-to-Table Conversion
-
-The processing time attribute is defined with the `.proctime` property during
schema definition. The time attribute must only extend the physical schema by
an additional logical field. Thus, it can only be defined at the end of the
schema definition.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, String>> stream = ...;
-
-// declare an additional logical field as a processing time attribute
-Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"),
$("user_action_time").proctime());
-
-WindowedTable windowedTable = table.window(
- Tumble.over(lit(10).minutes())
- .on($("user_action_time"))
- .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(String, String)] = ...
-
-// declare an additional logical field as a processing time attribute
-val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name",
$"data", $"user_action_time".proctime)
-
-val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time"
as "userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-### Using a TableSource
-
-The processing time attribute is defined by a `TableSource` that implements
the `DefinedProctimeAttribute` interface. The logical time attribute is
appended to the physical schema defined by the return type of the `TableSource`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a processing attribute
-public class UserActionSource implements StreamTableSource<Row>,
DefinedProctimeAttribute {
-
- @Override
- public TypeInformation<Row> getReturnType() {
- String[] names = new String[] {"user_name" , "data"};
- TypeInformation[] types = new TypeInformation[]
{Types.STRING(), Types.STRING()};
- return Types.ROW(names, types);
- }
-
- @Override
- public DataStream<Row> getDataStream(StreamExecutionEnvironment
execEnv) {
- // create stream
- DataStream<Row> stream = ...;
- return stream;
- }
-
- @Override
- public String getProctimeAttribute() {
- // field with this name will be appended as a third field
- return "user_action_time";
- }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
- .from("user_actions")
- .window(Tumble
- .over(lit(10).minutes())
- .on($("user_action_time"))
- .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a processing attribute
-class UserActionSource extends StreamTableSource[Row] with
DefinedProctimeAttribute {
-
- override def getReturnType = {
- val names = Array[String]("user_name" , "data")
- val types = Array[TypeInformation[_]](Types.STRING,
Types.STRING)
- Types.ROW(names, types)
- }
-
- override def getDataStream(execEnv: StreamExecutionEnvironment):
DataStream[Row] = {
- // create stream
- val stream = ...
- stream
- }
-
- override def getProctimeAttribute = {
- // field with this name will be appended as a third field
- "user_action_time"
- }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource)
-
-val windowedTable = tEnv
- .from("user_actions")
- .window(Tumble over 10.minutes on $"user_action_time" as
"userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-Event time
+Event Time
----------
-Event time allows a table program to produce results based on the time that is
contained in every record. This allows for consistent results even in case of
out-of-order events or late events. It also ensures replayable results of the
table program when reading records from persistent storage.
+Event time allows a table program to produce results based on timestamps in
every record, allowing for consistent results even in case of out-of-order
events or late events. It also ensures the replayability of the results of the
table program when reading records from persistent storage.
-Additionally, event time allows for unified syntax for table programs in both
batch and streaming environments. A time attribute in a streaming environment
can be a regular field of a record in a batch environment.
+Additionally, event time allows for unified syntax for table programs in both
batch and streaming environments. A time attribute in a streaming environment
can be a regular column of a row in a batch environment.
-In order to handle out-of-order events and distinguish between on-time and
late events in streaming, Flink needs to extract timestamps from events and
make some kind of progress in time (so-called [watermarks]({% link
dev/event_time.md %})).
+To handle out-of-order events and distinguish between on-time and late events
in streaming, Flink needs to extract timestamps from events and make some
progress in time (so-called [watermarks]({% link dev/event_time.md %})).
Review comment:
```suggestion
To handle out-of-order events and to distinguish between on-time and late
events in streaming, Flink needs to know the timestamp for each row, and it
also needs regular indications of how far along in event time the processing
has progressed so far (via so-called [watermarks]({% link dev/event_time.md
%})).
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard
timestamps.
+However, the opposite is not true; ordinary timestamp columns cannot be
arbitrarily converted to time attributes in the middle of a query.
-Time attributes can be part of every table schema. They are defined when
creating a table from a CREATE TABLE DDL or a `DataStream` or are pre-defined
when using a `TableSource`. Once a time attribute has been defined at the
beginning, it can be referenced as a field and can be used in time-based
operations.
-
-As long as a time attribute is not modified and is simply forwarded from one
part of the query to another, it remains a valid time attribute. Time
attributes behave like regular timestamps and can be accessed for calculations.
If a time attribute is used in a calculation, it will be materialized and
becomes a regular timestamp. Regular timestamps do not cooperate with Flink's
time and watermarking system and thus can not be used for time-based operations
anymore.
-
-Table programs require that the corresponding time characteristic has been
specified for the streaming environment:
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val env = StreamExecutionEnvironment.getExecutionEnvironment
-
-env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default
-
-// alternatively:
-// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
-// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-<div data-lang="python" markdown="1">
-{% highlight python %}
-env = StreamExecutionEnvironment.get_execution_environment()
-
-env.set_stream_time_characteristic(TimeCharacteristic.ProcessingTime) #
default
-
-# alternatively:
-# env.set_stream_time_characteristic(TimeCharacteristic.IngestionTime)
-# env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-{% endhighlight %}
-</div>
-</div>
-
-Processing time
----------------
-
-Processing time allows a table program to produce results based on the time of
the local machine. It is the simplest notion of time but does not provide
determinism. It neither requires timestamp extraction nor watermark generation.
-
-There are three ways to define a processing time attribute.
-
-### Defining in create table DDL
-
-The processing time attribute is defined as a computed column in create table
DDL using the system `PROCTIME()` function. Please see [CREATE TABLE DDL]({%
link dev/table/sql/create.md %}#create-table) for more information about
computed column.
-
-{% highlight sql %}
-
-CREATE TABLE user_actions (
- user_name STRING,
- data STRING,
- user_action_time AS PROCTIME() -- declare an additional field as a
processing time attribute
-) WITH (
- ...
-);
-
-SELECT TUMBLE_START(user_action_time, INTERVAL '10' MINUTE), COUNT(DISTINCT
user_name)
-FROM user_actions
-GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
-
-{% endhighlight %}
-
-
-### During DataStream-to-Table Conversion
-
-The processing time attribute is defined with the `.proctime` property during
schema definition. The time attribute must only extend the physical schema by
an additional logical field. Thus, it can only be defined at the end of the
schema definition.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-DataStream<Tuple2<String, String>> stream = ...;
-
-// declare an additional logical field as a processing time attribute
-Table table = tEnv.fromDataStream(stream, $("user_name"), $("data"),
$("user_action_time").proctime());
-
-WindowedTable windowedTable = table.window(
- Tumble.over(lit(10).minutes())
- .on($("user_action_time"))
- .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-val stream: DataStream[(String, String)] = ...
-
-// declare an additional logical field as a processing time attribute
-val table = tEnv.fromDataStream(stream, $"UserActionTimestamp", $"user_name",
$"data", $"user_action_time".proctime)
-
-val windowedTable = table.window(Tumble over 10.minutes on $"user_action_time"
as "userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-### Using a TableSource
-
-The processing time attribute is defined by a `TableSource` that implements
the `DefinedProctimeAttribute` interface. The logical time attribute is
appended to the physical schema defined by the return type of the `TableSource`.
-
-<div class="codetabs" markdown="1">
-<div data-lang="java" markdown="1">
-{% highlight java %}
-// define a table source with a processing attribute
-public class UserActionSource implements StreamTableSource<Row>,
DefinedProctimeAttribute {
-
- @Override
- public TypeInformation<Row> getReturnType() {
- String[] names = new String[] {"user_name" , "data"};
- TypeInformation[] types = new TypeInformation[]
{Types.STRING(), Types.STRING()};
- return Types.ROW(names, types);
- }
-
- @Override
- public DataStream<Row> getDataStream(StreamExecutionEnvironment
execEnv) {
- // create stream
- DataStream<Row> stream = ...;
- return stream;
- }
-
- @Override
- public String getProctimeAttribute() {
- // field with this name will be appended as a third field
- return "user_action_time";
- }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource());
-
-WindowedTable windowedTable = tEnv
- .from("user_actions")
- .window(Tumble
- .over(lit(10).minutes())
- .on($("user_action_time"))
- .as("userActionWindow"));
-{% endhighlight %}
-</div>
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
-// define a table source with a processing attribute
-class UserActionSource extends StreamTableSource[Row] with
DefinedProctimeAttribute {
-
- override def getReturnType = {
- val names = Array[String]("user_name" , "data")
- val types = Array[TypeInformation[_]](Types.STRING,
Types.STRING)
- Types.ROW(names, types)
- }
-
- override def getDataStream(execEnv: StreamExecutionEnvironment):
DataStream[Row] = {
- // create stream
- val stream = ...
- stream
- }
-
- override def getProctimeAttribute = {
- // field with this name will be appended as a third field
- "user_action_time"
- }
-}
-
-// register table source
-tEnv.registerTableSource("user_actions", new UserActionSource)
-
-val windowedTable = tEnv
- .from("user_actions")
- .window(Tumble over 10.minutes on $"user_action_time" as
"userActionWindow")
-{% endhighlight %}
-</div>
-</div>
-
-Event time
+Event Time
----------
-Event time allows a table program to produce results based on the time that is
contained in every record. This allows for consistent results even in case of
out-of-order events or late events. It also ensures replayable results of the
table program when reading records from persistent storage.
+Event time allows a table program to produce results based on timestamps in
every record, allowing for consistent results even in case of out-of-order
events or late events. It also ensures the replayability of the results of the
table program when reading records from persistent storage.
Review comment:
Can we really argue that results are consistent with late events?
```suggestion
Event time allows a table program to produce results based on timestamps in
every record, allowing for consistent results despite out-of-order or late
events. It also ensures the replayability of the results of the table program
when reading records from persistent storage.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -22,213 +22,42 @@ specific language governing permissions and limitations
under the License.
-->
-Flink is able to process streaming data based on different notions of *time*.
+Flink can process data based on different notions of time.
-- *Processing time* refers to the system time of the machine (also known as
"wall-clock time") that is executing the respective operation.
-- *Event time* refers to the processing of streaming data based on timestamps
which are attached to each row. The timestamps can encode when an event
happened.
-- *Ingestion time* is the time that events enter Flink; internally, it is
treated similarly to event time.
+*Processing time* refers to the machine's system time (also known as
"wall-clock time") that is executing the respective operation.
+- *Event time* refers to the processing of streaming data based on timestamps
that are attached to each row. The timestamps can encode when an event happened.
-For more information about time handling in Flink, see the introduction about
[Event Time and Watermarks]({% link dev/event_time.md %}).
+For more information about time handling in Flink, see the introduction about
[event time and watermarks]({% link dev/event_time.md %}).
-This page explains how time attributes can be defined for time-based
operations in Flink's Table API & SQL.
* This will be replaced by the TOC
{:toc}
Introduction to Time Attributes
-------------------------------
-Time-based operations such as windows in both the [Table API]({% link
dev/table/tableApi.md %}#group-windows) and [SQL]({% link
dev/table/sql/queries.md %}#group-windows) require information about the notion
of time and its origin. Therefore, tables can offer *logical time attributes*
for indicating time and accessing corresponding timestamps in table programs.
+Time attributes can be part of every table schema.
+They are defined when creating a table from a `CREATE TABLE DDL` or a
`DataStream`.
+Once a time attribute is defined, it can be referenced as a field and used in
time-based operations.
+As long as a time attribute is not modified and forwarded from one part of the
query to another, it remains valid.
+Time attributes behave like regular timestamps and accessible for calculations.
+When used in computation, time attributes are materialized and act as standard
timestamps.
Review comment:
```suggestion
When used in calculations, time attributes are materialized and act as
standard timestamps.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -251,14 +80,12 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
### During DataStream-to-Table Conversion
-The event time attribute is defined with the `.rowtime` property during schema
definition. [Timestamps and watermarks]({% link dev/event_time.md %}) must have
been assigned in the `DataStream` that is converted.
+When converting a `DataStream` to a table, an event time attribute can be
defined with the `.rowtime` property during schema definition. [Timestamps and
watermarks]({% link dev/event_time.md %}) must have been already assigned in
the `DataStream` being converted.
-There are two ways of defining the time attribute when converting a
`DataStream` into a `Table`. Depending on whether the specified `.rowtime`
field name exists in the schema of the `DataStream` or not, the timestamp field
is either
+There are two ways of defining the time attribute when converting a
`DataStream` into a `Table`. Depending on whether the specified `.rowtime`
field name exists in the schema of the `DataStream`, the timestamp is either
(1) appended as a new column to the schema or
+(2) replaces an existing column.
Review comment:
```suggestion
There are two ways of defining the time attribute when converting a
`DataStream` into a `Table`. Depending on whether the specified `.rowtime`
field name exists in the schema of the `DataStream`, the timestamp is either
(1) appended as a new column, or it
(2) replaces an existing column.
```
##########
File path: docs/dev/table/streaming/time_attributes.md
##########
@@ -251,14 +80,12 @@ GROUP BY TUMBLE(user_action_time, INTERVAL '10' MINUTE);
### During DataStream-to-Table Conversion
-The event time attribute is defined with the `.rowtime` property during schema
definition. [Timestamps and watermarks]({% link dev/event_time.md %}) must have
been assigned in the `DataStream` that is converted.
+When converting a `DataStream` to a table, an event time attribute can be
defined with the `.rowtime` property during schema definition. [Timestamps and
watermarks]({% link dev/event_time.md %}) must have been already assigned in
the `DataStream` being converted.
Review comment:
```suggestion
When converting a `DataStream` to a table, an event time attribute can be
defined with the `.rowtime` property during schema definition. [Timestamps and
watermarks]({% link dev/event_time.md %}) must have already been assigned in
the `DataStream` being converted.
```
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]