[ 
https://issues.apache.org/jira/browse/FLINK-14042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuval Itzchakov updated FLINK-14042:
------------------------------------
    Description: 
Given the following table called "foo":
{code:java}
SELECT event_time, b, c
FROM X
WHERE event_time >= <START_TIME> AND event_time <END_TIME>{code}
And the following temporal table definition defined on "foo":
{code:java}
SELECT event_time, b, COLLECT(c) c
FROM foo
GROUP BY event_time, b{code}
I get the following exception:
{code:java}
Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
different type to set:Exception in thread "main" java.lang.AssertionError: 
Cannot add expression of different type to set:set type is 
RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" MULTISET c) NOT NULLexpression type is 
RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" MULTISET NOT NULL c) NOT NULL
set is 
rel#17:LogicalCorrelate.NONE(left=HepRelVertex#15,right=HepRelVertex#16,correlation=$cor0,joinType=inner,requiredColumns={0})expression
 is LogicalTemporalTableJoin#23 at 
org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:380) 
at org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:57) at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:111)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:280) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198) at 
org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
 at 
org.apache.flink.table.api.TableEnvironment.runHepPlannerSimultaneously(TableEnvironment.scala:344)
 at 
org.apache.flink.table.api.TableEnvironment.optimizeExpandPlan(TableEnvironment.scala:270)
 at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:809)
 at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
 at 
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
 at org.apache.flink.table.api.Table.insertInto(table.scala:1148)
{code}
Digging into the table planner, it appears that when the temporal table is 
being registered, it goes through `FlinkTypeFactory.buildLogicalRowType`, which 
uses the following code:
{code:java}
def buildLogicalRowType(
    fieldNames: Seq[String],
    fieldTypes: Seq[TypeInformation[_]])
  : RelDataType = {
  val logicalRowTypeBuilder = builder

  val fields = fieldNames.zip(fieldTypes)
  fields.foreach(f => {
    // time indicators are not nullable
    val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2)
    logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable))
  })

  logicalRowTypeBuilder.build
}
{code}
 We can see here that `nullable` is derived from `isTimeIndicatorType` method.

On the other hand, when registering the table that uses the 
TemporalTableFunction in the query, this resolves through 
`FlinkTableFunctionImpl.getRowType`, which doesn't look up the time indicator 
flags at all and sets all fields to be nullable:
{code:java}
override def getRowType(typeFactory: RelDataTypeFactory,
                        arguments: util.List[AnyRef]): RelDataType = {
  val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
  val builder = flinkTypeFactory.builder
  fieldNames
    .zip(fieldTypes)
    .foreach { f =>
      builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, 
isNullable = true))
    }
  builder.build
}
{code}
This creates a diff between the original schema registered and the inferred 
schema for usage, which results in the above exception.

I haven't tried this for other complex types, but it seems like this should 
happen for any advanced type which wasn't nullable to begin with.

 

  was:
Given the following table called "foo":
{code:java}
SELECT event_time, b, c
FROM X
WHERE event_time >= <START_TIME> AND event_time <END_TIME>{code}
And the following temporal table definition defined on "foo":
{code:java}
SELECT a, b, COLLECT(c) c
FROM foo
GROUP BY a, b{code}
I get the following exception:
{code:java}
Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
different type to set:Exception in thread "main" java.lang.AssertionError: 
Cannot add expression of different type to set:set type is 
RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" MULTISET c) NOT NULLexpression type is 
RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
"UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
"ISO-8859-1$en_US$primary" MULTISET NOT NULL c) NOT NULL
set is 
rel#17:LogicalCorrelate.NONE(left=HepRelVertex#15,right=HepRelVertex#16,correlation=$cor0,joinType=inner,requiredColumns={0})expression
 is LogicalTemporalTableJoin#23 at 
org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:380) 
at org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:57) at 
org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) at 
org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:111)
 at 
org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
 at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at 
org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415) at 
org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:280) 
at 
org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
 at org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211) 
at org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198) at 
org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
 at 
org.apache.flink.table.api.TableEnvironment.runHepPlannerSimultaneously(TableEnvironment.scala:344)
 at 
org.apache.flink.table.api.TableEnvironment.optimizeExpandPlan(TableEnvironment.scala:270)
 at 
org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:809)
 at 
org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
 at 
org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
 at org.apache.flink.table.api.Table.insertInto(table.scala:1148)
{code}
Digging into the table planner, it appears that when the temporal table is 
being registered, it goes through `FlinkTypeFactory.buildLogicalRowType`, which 
uses the following code:
{code:java}
def buildLogicalRowType(
    fieldNames: Seq[String],
    fieldTypes: Seq[TypeInformation[_]])
  : RelDataType = {
  val logicalRowTypeBuilder = builder

  val fields = fieldNames.zip(fieldTypes)
  fields.foreach(f => {
    // time indicators are not nullable
    val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2)
    logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable))
  })

  logicalRowTypeBuilder.build
}
{code}
 We can see here that `nullable` is derived from `isTimeIndicatorType` method.

On the other hand, when registering the table that uses the 
TemporalTableFunction in the query, this resolves through 
`FlinkTableFunctionImpl.getRowType`, which doesn't look up the time indicator 
flags at all and sets all fields to be nullable:
{code:java}
override def getRowType(typeFactory: RelDataTypeFactory,
                        arguments: util.List[AnyRef]): RelDataType = {
  val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
  val builder = flinkTypeFactory.builder
  fieldNames
    .zip(fieldTypes)
    .foreach { f =>
      builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, 
isNullable = true))
    }
  builder.build
}
{code}
This creates a diff between the original schema registered and the inferred 
schema for usage, which results in the above exception.

I haven't tried this for other complex types, but it seems like this should 
happen for any advanced type which wasn't nullable to begin with.

 


> Different RelDataTypes generated for same TemporalTableFunction
> ---------------------------------------------------------------
>
>                 Key: FLINK-14042
>                 URL: https://issues.apache.org/jira/browse/FLINK-14042
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.8.1
>            Reporter: Yuval Itzchakov
>            Priority: Major
>
> Given the following table called "foo":
> {code:java}
> SELECT event_time, b, c
> FROM X
> WHERE event_time >= <START_TIME> AND event_time <END_TIME>{code}
> And the following temporal table definition defined on "foo":
> {code:java}
> SELECT event_time, b, COLLECT(c) c
> FROM foo
> GROUP BY event_time, b{code}
> I get the following exception:
> {code:java}
> Exception in thread "main" java.lang.AssertionError: Cannot add expression of 
> different type to set:Exception in thread "main" java.lang.AssertionError: 
> Cannot add expression of different type to set:set type is 
> RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
> "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
> "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
> event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
> "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" MULTISET c) NOT NULLexpression type is 
> RecordType(TIMESTAMP(3) NOT NULL event_time, VARCHAR(65536) CHARACTER SET 
> "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" b, VARCHAR(65536) CHARACTER SET 
> "UTF-16LE" COLLATE "ISO-8859-1$en_US$primary" c0, TIMESTAMP(3) NOT NULL 
> event_time0, VARCHAR(65536) CHARACTER SET "UTF-16LE" COLLATE 
> "ISO-8859-1$en_US$primary" b0, VARCHAR(65536) CHARACTER SET "UTF-16LE" 
> COLLATE "ISO-8859-1$en_US$primary" MULTISET NOT NULL c) NOT NULL
> set is 
> rel#17:LogicalCorrelate.NONE(left=HepRelVertex#15,right=HepRelVertex#16,correlation=$cor0,joinType=inner,requiredColumns={0})expression
>  is LogicalTemporalTableJoin#23 at 
> org.apache.calcite.plan.RelOptUtil.verifyTypeEquivalence(RelOptUtil.java:380) 
> at org.apache.calcite.plan.hep.HepRuleCall.transformTo(HepRuleCall.java:57) 
> at 
> org.apache.calcite.plan.RelOptRuleCall.transformTo(RelOptRuleCall.java:234) 
> at 
> org.apache.flink.table.plan.rules.logical.LogicalCorrelateToTemporalTableJoinRule.onMatch(LogicalCorrelateToTemporalTableJoinRule.scala:111)
>  at 
> org.apache.calcite.plan.AbstractRelOptPlanner.fireRule(AbstractRelOptPlanner.java:315)
>  at org.apache.calcite.plan.hep.HepPlanner.applyRule(HepPlanner.java:556) at 
> org.apache.calcite.plan.hep.HepPlanner.applyRules(HepPlanner.java:415) at 
> org.apache.calcite.plan.hep.HepPlanner.executeInstruction(HepPlanner.java:280)
>  at 
> org.apache.calcite.plan.hep.HepInstruction$RuleCollection.execute(HepInstruction.java:74)
>  at 
> org.apache.calcite.plan.hep.HepPlanner.executeProgram(HepPlanner.java:211) at 
> org.apache.calcite.plan.hep.HepPlanner.findBestExp(HepPlanner.java:198) at 
> org.apache.flink.table.api.TableEnvironment.runHepPlanner(TableEnvironment.scala:360)
>  at 
> org.apache.flink.table.api.TableEnvironment.runHepPlannerSimultaneously(TableEnvironment.scala:344)
>  at 
> org.apache.flink.table.api.TableEnvironment.optimizeExpandPlan(TableEnvironment.scala:270)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.optimize(StreamTableEnvironment.scala:809)
>  at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:351)
>  at 
> org.apache.flink.table.api.TableEnvironment.insertInto(TableEnvironment.scala:879)
>  at org.apache.flink.table.api.Table.insertInto(table.scala:1148)
> {code}
> Digging into the table planner, it appears that when the temporal table is 
> being registered, it goes through `FlinkTypeFactory.buildLogicalRowType`, 
> which uses the following code:
> {code:java}
> def buildLogicalRowType(
>     fieldNames: Seq[String],
>     fieldTypes: Seq[TypeInformation[_]])
>   : RelDataType = {
>   val logicalRowTypeBuilder = builder
>   val fields = fieldNames.zip(fieldTypes)
>   fields.foreach(f => {
>     // time indicators are not nullable
>     val nullable = !FlinkTypeFactory.isTimeIndicatorType(f._2)
>     logicalRowTypeBuilder.add(f._1, createTypeFromTypeInfo(f._2, nullable))
>   })
>   logicalRowTypeBuilder.build
> }
> {code}
>  We can see here that `nullable` is derived from `isTimeIndicatorType` method.
> On the other hand, when registering the table that uses the 
> TemporalTableFunction in the query, this resolves through 
> `FlinkTableFunctionImpl.getRowType`, which doesn't look up the time indicator 
> flags at all and sets all fields to be nullable:
> {code:java}
> override def getRowType(typeFactory: RelDataTypeFactory,
>                         arguments: util.List[AnyRef]): RelDataType = {
>   val flinkTypeFactory = typeFactory.asInstanceOf[FlinkTypeFactory]
>   val builder = flinkTypeFactory.builder
>   fieldNames
>     .zip(fieldTypes)
>     .foreach { f =>
>       builder.add(f._1, flinkTypeFactory.createTypeFromTypeInfo(f._2, 
> isNullable = true))
>     }
>   builder.build
> }
> {code}
> This creates a diff between the original schema registered and the inferred 
> schema for usage, which results in the above exception.
> I haven't tried this for other complex types, but it seems like this should 
> happen for any advanced type which wasn't nullable to begin with.
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to