Github user fhueske commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4020#discussion_r119226503
  
    --- Diff: docs/dev/table/streaming.md ---
    @@ -39,17 +39,293 @@ Dynamic table
     Time Attributes
     ---------------
     
    -### Event-time
    +Flink supports different notions of *time* in streaming programs.
     
    -* DataStream: Timestamps & WMs required, `.rowtime` (replace attribute or 
extend schema)
    -* TableSource: Timestamps & WMs & DefinedRowtimeAttribute
    +- *Processing time* refers to the system time of the machine (also known 
as "wall-clock time") that is executing the respective operation.
    +- *Event time* is the time that each individual event occurred on its 
producing device.
    +- *Ingestion time* is the time that events enter Flink, internally, it is 
treated similar to event time.
     
    -{% top %}
    +For more information about time handling in Flink, see the introduction 
about [Event Time and Watermarks]({{ site.baseurl }}/dev/event_time.html).
    +
    +Table programs assume that a corresponding time characteristic has been 
specified for the streaming environment:
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // 
default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val env = StreamExecutionEnvironment.getExecutionEnvironment
    +
    +env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) 
//default
    +
    +// alternatively:
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
    +// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +Time-based operations such as [windows]({{ site.baseurl 
}}/dev/table/tableApi.html) in both the Table API and SQL require information 
about the notion of time and its origin. Therefore, tables can offer *logical 
time attributes* for indicating time and accessing corresponding timestamps in 
table programs.
    +
    +Time attributes can be part of every table schema. They are defined when 
creating a table from a `DataStream` or pre-defined when using a `TableSource`. 
Once a time attribute is defined at the beginning, it can be referenced as 
field and used in time-based operations.
    +
    +As long as a time attribute is not modified and simply forwarded from one 
part of the query to another, it remains a valid time attribute. Time 
attributes behave like regular timestamps and can be accessed for calculations. 
If a time attribute is used in a calculation, it will be materialized and 
becomes a regular timestamp. Regular timestamps do not cooperate with Flink's 
time and watermarking system and can thus not be used for time-based operations 
anymore.
     
     ### Processing time
     
    -* DataStream: `.proctime` (only extend schema)
    -* TableSource: DefinedProctimeAttribute
    +Processing time allows a table program to produce results based on the 
time of the local machine. It is the simplest notion of time but does not 
provide determinism. It does neither require timestamp extraction nor watermark 
generation.
    +
    +There are two ways to define a processing time attribute.
    +
    +#### During DataStream-to-Table Conversion
    +
    +The processing time attribute is defined with the `.proctime` property 
during schema definition. The time attribute must only extend the physical 
schema by an additional logical field. Thus, it can only be defined at the end 
of the schema definition.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +DataStream<Tuple2<String, String>> stream = ...;
    +
    +// declare an additional logical field as processing time attribute
    +Table table = tEnv.fromDataStream(stream, "Username, Data, 
UserActionTime.proctime");
    +
    +WindowedTable windowedTable = 
table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +val stream: DataStream[(String, String)] = ...
    +
    +// declare an additional logical field as processing time attribute
    +val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 
'Data, 'UserActionTime.proctime)
    +
    +val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime 
as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +#### Using a TableSource
    +
    +The processing time attribute is defined by a `TableSource` that 
implements the `DefinedProctimeAttribute` interface. The logical time attribute 
is appended to the physical schema defined by the return type of the 
`TableSource`.
    +
    +<div class="codetabs" markdown="1">
    +<div data-lang="java" markdown="1">
    +{% highlight java %}
    +// define table source with processing attribute
    +public class UserActionSource implements StreamTableSource<Row>, 
DefinedProctimeAttribute {
    +
    +   @Override
    +   public TypeInformation<Row> getReturnType() {
    +           String[] names = new String[] {"Username" , "Data"};
    +           TypeInformation[] types = new TypeInformation[] 
{Types.STRING(), Types.STRING()};
    +           return Types.ROW(names, types);
    +   }
    +
    +   @Override
    +   public DataStream<Row> getDataStream(StreamExecutionEnvironment 
execEnv) {
    +           // create stream 
    +           DataStream<Row> stream = ...;
    +           return stream;
    +   }
    +
    +   @Override
    +   public String getProctimeAttribute() {
    +           // field with this name will be appended as a third field 
    +           return "UserActionTime";
    +   }
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource());
    +
    +WindowedTable windowedTable = tEnv
    +   .scan("UserActions")
    +   
.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
    +{% endhighlight %}
    +</div>
    +<div data-lang="scala" markdown="1">
    +{% highlight scala %}
    +// define table source with processing attribute
    +class UserActionSource extends StreamTableSource[Row] with 
DefinedProctimeAttribute {
    +
    +   override def getReturnType = {
    +           val names = Array[String]("Username" , "Data")
    +           val types = Array[TypeInformation[_]](Types.STRING, 
Types.STRING)
    +           Types.ROW(names, types)
    +   }
    +
    +   override def getDataStream(execEnv: StreamExecutionEnvironment): 
DataStream[Row] = {
    +           // create stream
    +           val stream = ...
    +           stream
    +   }
    +
    +   override def getProctimeAttribute = {
    +           // field with this name will be appended as a third field 
    +           "UserActionTime"
    +   }
    +}
    +
    +// register table source
    +tEnv.registerTableSource("UserActions", new UserActionSource)
    +
    +val windowedTable = tEnv
    +   .scan("UserActions")
    +   .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)
    +{% endhighlight %}
    +</div>
    +</div>
    +
    +### Event time
    +
    +Event time allows a table program to produce results based on the time 
that is contained in every record. This allows for consistent results even in 
case of out-of-order events or late events. It also ensures replayable results 
of table program when reading records from persistent storage. 
    +
    +Additionally, event time allows for unified syntax for table programs in 
both a batch and streaming environment. A time attribute in streaming can be a 
regular field of a record in a batch environment.
    +
    +In order to handle out-of-order events and distinguish between on-time and 
late events in streaming, Flink needs to extract timestamps from events and 
make some kind of progress in time (so-called [watermarks]({{ site.baseurl 
}}/dev/event_time.html)).
    +
    +The Table API & SQL assumes that timestamps and watermarks have been 
generated in the [underlying DataStream API]({{ site.baseurl 
}}/dev/event_timestamps_watermarks.html) before. Ideally, this happens within a 
TableSource with knowledge about the incoming data's characteristics and hidden 
from the API end user.
    --- End diff --
    
    I would move the discussion about where timestamps and watermarks are added 
to the subsections. 
    
    In "Datastream-to-Table conversion" the timestamps should be present in the 
DataStream that is converted. In "Using a TableSource" the timestamp / 
watermarks must be assigned in the stream that is returned by the 
`getDataStream()` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to