[
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16052154#comment-16052154
]
Fabian Hueske edited comment on FLINK-6886 at 6/16/17 5:29 PM:
---------------------------------------------------------------
You are right. The problem is in {{translate(dataStreamPlan,
relNode.getRowType, queryConfig, withChangeFlag)}}.
We use the RowType of the original input plan because the field names might
change (Calcite prunes pure renaming projections as noops). However, the
{{RelTimeIndicatorConverter}} (correctly!) changes the types of time
indicators. So, types of the optimized plan are not identical to the original
plan. This difference causes the exception.
A simple solution would be to just merge the field names of the original plan
with the field types of the optimized plan and construct a new {{RelDataType}}.
I change the {{StreamTableEnvironment.translate()}} method to this:
{code}
protected def translate[A](
table: Table,
queryConfig: StreamQueryConfig,
updatesAsRetraction: Boolean,
withChangeFlag: Boolean)(implicit tpe: TypeInformation[A]): DataStream[A]
= {
val relNode = table.getRelNode
val dataStreamPlan = optimize(relNode, updatesAsRetraction)
// zip original field names with optimized field types
val x =
relNode.getRowType.getFieldList.asScala.zip(dataStreamPlan.getRowType.getFieldList.asScala)
// get name of original plan and type of optimized plan
.map(x => (x._1.getName, x._2.getType))
// add index
.zipWithIndex
// build new field types
.map(x => new RelDataTypeFieldImpl(x._1._1, x._2, x._1._2))
// build a record type from list of field types
val rowType = new
RelRecordType(x.toList.asJava.asInstanceOf[_root_.java.util.List[RelDataTypeField]])
translate(dataStreamPlan, rowType, queryConfig, withChangeFlag)
}
{code}
and got it (and all tests) working.
The field merging can be done a lot nicer.
was (Author: fhueske):
You are right. The problem is in {{translate(dataStreamPlan,
relNode.getRowType, queryConfig, withChangeFlag)}}
> Fix Timestamp field can not be selected in event time case when
> toDataStream[T], `T` not a `Row` Type.
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6886
> URL: https://issues.apache.org/jira/browse/FLINK-6886
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and
> logical row types do not match.This is a bug and should not happen. Please
> file an issue.
> at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
> at
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
> at
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}}
> method bug. So in this JIRA. will fix it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)