[
https://issues.apache.org/jira/browse/FLINK-6886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050535#comment-16050535
]
Fabian Hueske commented on FLINK-6886:
--------------------------------------
Maybe there's another way to fix this problem. I played a bit around and found
the following:
The following Table API query is executed correctly:
{code}
val table = stream.toTable(tEnv, 'l, 'i, 'n, 'proctime.proctime)
val windowedTable = table
.window(Tumble over 2.seconds on 'proctime as 'w)
.groupBy('w, 'n)
.select('n, 'i.count as 'cnt, 'w.start as 's, 'w.end as 'e)
val results = windowedTable.toAppendStream[MP](queryConfig)
// POJO
class MP(var s: Timestamp, var e: Timestamp, var cnt: Long, var n: String) {
def this() { this(null, null, 0, null) }
override def toString: String = s"$n,${s.toString},${e.toString},$cnt"
}
{code}
whereas the equivalent SQL query fails with the reported exception ("The field
types of physical and logical row types do not match")
{code}
val sqlTable = tEnv.sql(
s"""SELECT TUMBLE_START(proctime, INTERVAL '2' SECOND) AS s,
| TUMBLE_END(proctime, INTERVAL '2' SECOND) AS e,
| n,
| COUNT(i) as cnt
|FROM $table
|GROUP BY n, TUMBLE(proctime, INTERVAL '2' SECOND)
|
""".stripMargin)
val results = sqlTable.toAppendStream[MP](queryConfig)
{code}
The plans of both queries look similar, but the SQL plan seems to lack the
correct final projection:
{code}
// Table API plan
== Abstract Syntax Tree ==
LogicalProject(n=[$0], cnt=[AS($1, 'cnt')], s=[AS($2, 's')], e=[AS($3, 'e')])
LogicalWindowAggregate(group=[{0}], TMP_0=[COUNT($1)])
LogicalProject(n=[$2], i=[$1], proctime=[$3])
LogicalTableScan(table=[[_DataStreamTable_0]])
== Optimized Logical Plan ==
DataStreamCalc(select=[n, TMP_0 AS cnt, TMP_1 AS s, TMP_2 AS e])
DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w,
'proctime, 2000.millis)], select=[n, COUNT(i) AS TMP_0, start('w) AS TMP_1,
end('w) AS TMP_2])
DataStreamCalc(select=[n, i, proctime])
DataStreamScan(table=[[_DataStreamTable_0]])
// SQL plans
== Abstract Syntax Tree ==
LogicalProject(s=[TUMBLE_START($1)], e=[TUMBLE_END($1)], n=[$0], cnt=[$2])
LogicalAggregate(group=[{0, 1}], cnt=[COUNT($2)])
LogicalProject(n=[$2], $f1=[TUMBLE($3, 2000)], i=[$1])
LogicalTableScan(table=[[UnnamedTable$3]])
== Optimized Logical Plan ==
DataStreamCalc(select=[w$start, w$end, n, cnt])
DataStreamGroupWindowAggregate(groupBy=[n], window=[TumblingGroupWindow('w$,
'proctime, 2000.millis)], select=[n, COUNT(i) AS cnt, start('w$) AS w$start,
end('w$) AS w$end])
DataStreamCalc(select=[n, proctime, i])
DataStreamScan(table=[[_DataStreamTable_0]])
{code}
So this doesn't seem to be a principled issue with the time attributes or
window properties but rather an issue of the SQL optimization.
What do you think [~sunjincheng121] and [~jark]?
> Fix Timestamp field can not be selected in event time case when
> toDataStream[T], `T` not a `Row` Type.
> -------------------------------------------------------------------------------------------------------
>
> Key: FLINK-6886
> URL: https://issues.apache.org/jira/browse/FLINK-6886
> Project: Flink
> Issue Type: Bug
> Components: Table API & SQL
> Affects Versions: 1.4.0
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> Currently for event-time window(group/over), When contain `Timestamp` type
> field in `SELECT Clause`, And toDataStream[T], `T` not a `Row` Type, Such
> `PojoType`, will throw a exception. In this JIRA. will fix this bug. For
> example:
> Group Window on SQL:
> {code}
> SELECT name, max(num) as myMax, TUMBLE_START(rowtime, INTERVAL '5' SECOND) as
> winStart,TUMBLE_END(rowtime, INTERVAL '5' SECOND) as winEnd FROM T1 GROUP BY
> name, TUMBLE(rowtime, INTERVAL '5' SECOND)
> {code}
> Throw Exception:
> {code}
> org.apache.flink.table.api.TableException: The field types of physical and
> logical row types do not match.This is a bug and should not happen. Please
> file an issue.
> at org.apache.flink.table.api.TableException$.apply(exceptions.scala:53)
> at
> org.apache.flink.table.api.TableEnvironment.generateRowConverterFunction(TableEnvironment.scala:721)
> at
> org.apache.flink.table.api.StreamTableEnvironment.getConversionMapper(StreamTableEnvironment.scala:247)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:647)
> {code}
> In fact, when we solve this exception,subsequent other exceptions will be
> thrown. The real reason is {{TableEnvironment#generateRowConverterFunction}}
> method bug. So in this JIRA. will fix it.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)