Github user fhueske commented on a diff in the pull request:
https://github.com/apache/flink/pull/4532#discussion_r132854238
--- Diff:
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
---
@@ -667,30 +719,62 @@ abstract class StreamTableEnvironment(
// get CRow plan
val plan: DataStream[CRow] = translateToCRow(logicalPlan, queryConfig)
+ val rowtimeFields = logicalType
+ .getFieldList.asScala
+ .filter(f => FlinkTypeFactory.isRowtimeIndicatorType(f.getType))
+
+ // convert the input type for the conversion mapper
+ // the input will be changed in the OutputRowtimeProcessFunction later
+ val convType = if (rowtimeFields.size > 1) {
+ throw new TableException(
--- End diff --
I think we should avoid implicit defaults like using the timestamp
attribute of the left most table (the left most table might not have a time
indicator attribute, join order optimization would change the order of tables)
and special cases for queries like `SELECT *`.
When a `Table` is converted into a `DataStream` it is likely that the
resulting stream is further processed by logic that cannot be expressed in SQL
/ Table API. If a `Table` has multiple timestamp attributes, IMO a user should
be forced to make a choice for the `StreamRecord` timestamp, because the
semantics of any subsequent time-based operations will depend on that. I see
two ways to do that:
- ensure that only one attribute is a time indicator by casting the others
to `TIMESTAMP`
- let the user specify which field should be used as timestamp as an
additional parameter of the `toAppendStream` and `toRetractStream` methods.
We could also do both.
I agree with @wuchong that we do not need this restriction when we emit a
`Table` to a `TableSink`.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---